Browse Source

Merge branch 'master' into ToObservableImprovements

David Karnok 7 years ago
parent
commit
32ab19b816
33 changed files with 276 additions and 243 deletions
  1. 1 0
      Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs
  2. 26 0
      Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RangeBenchmark.cs
  3. 0 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs
  4. 18 28
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Collect.cs
  5. 4 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAt.cs
  6. 10 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstAsync.cs
  7. 0 5
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ForEach.cs
  8. 54 47
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs
  9. 3 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Latest.cs
  10. 0 18
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Max.cs
  11. 0 18
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Min.cs
  12. 3 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/MostRecent.cs
  13. 3 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Next.cs
  14. 11 12
      Rx.NET/Source/src/System.Reactive/Linq/Observable/PushToPullAdapter.cs
  15. 76 30
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs
  16. 0 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Scan.cs
  17. 0 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Select.cs
  18. 6 9
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs
  19. 0 5
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleAsync.cs
  20. 0 5
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleOrDefaultAsync.cs
  21. 0 5
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs
  22. 0 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipWhile.cs
  23. 2 12
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Sum.cs
  24. 4 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLastBuffer.cs
  25. 0 1
      Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeWhile.cs
  26. 10 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToArray.cs
  27. 11 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToDictionary.cs
  28. 10 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToList.cs
  29. 11 2
      Rx.NET/Source/src/System.Reactive/Linq/Observable/ToLookup.cs
  30. 2 3
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Where.cs
  31. 0 4
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs
  32. 5 8
      Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs
  33. 6 1
      Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

+ 1 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -16,6 +16,7 @@ namespace Benchmarks.System.Reactive
                 typeof(CombineLatestBenchmark),
                 typeof(SwitchBenchmark),
                 typeof(BufferCountBenchmark),
+                typeof(RangeBenchmark),
                 typeof(ToObservableBenchmark)
             });
 

+ 26 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/RangeBenchmark.cs

@@ -0,0 +1,26 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class RangeBenchmark
+    {
+        [Params(1, 10, 100, 1000, 10000, 100000, 1000000)]
+        public int N;
+
+        int _store;
+
+        [Benchmark]
+        public void Range()
+        {
+            Observable.Range(1, N).Subscribe(v => Volatile.Write(ref _store, v));
+        }
+    }
+}

+ 0 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Buffer.cs

@@ -190,7 +190,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 private readonly int _count;
                 private readonly int _skip;
 
-                int _index;
                 int _n;
 
                 public OverlapSink(IObserver<IList<TSource>> observer, int count, int skip)

+ 18 - 28
Rx.NET/Source/src/System.Reactive/Linq/Observable/Collect.cs

@@ -18,50 +18,38 @@ namespace System.Reactive.Linq.ObservableImpl
             _getNewCollector = getNewCollector;
         }
 
-        protected override PushToPullSink<TSource, TResult> Run(IDisposable subscription)
-        {
-            var sink = new _(this, subscription);
-            sink.Initialize();
-            return sink;
-        }
+        protected override PushToPullSink<TSource, TResult> Run() => new _(_merge, _getNewCollector, _getInitialCollector());
 
         private sealed class _ : PushToPullSink<TSource, TResult>
         {
-            // CONSIDER: This sink has a parent reference that can be considered for removal.
+            readonly object _gate;
+            readonly Func<TResult, TSource, TResult> _merge;
+            readonly Func<TResult, TResult> _getNewCollector;
 
-            private readonly Collect<TSource, TResult> _parent;
-
-            public _(Collect<TSource, TResult> parent, IDisposable subscription)
-                : base(subscription)
+            public _(Func<TResult, TSource, TResult> merge, Func<TResult, TResult> getNewCollector, TResult collector)
             {
-                _parent = parent;
+                _gate = new object();
+                _merge = merge;
+                _getNewCollector = getNewCollector;
+                _collector = collector;
             }
 
-            private object _gate;
             private TResult _collector;
-            private bool _hasFailed;
             private Exception _error;
             private bool _hasCompleted;
             private bool _done;
 
-            public void Initialize()
-            {
-                _gate = new object();
-                _collector = _parent._getInitialCollector();
-            }
-
             public override void OnNext(TSource value)
             {
                 lock (_gate)
                 {
                     try
                     {
-                        _collector = _parent._merge(_collector, value);
+                        _collector = _merge(_collector, value);
                     }
                     catch (Exception ex)
                     {
                         _error = ex;
-                        _hasFailed = true;
 
                         Dispose();
                     }
@@ -75,7 +63,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 lock (_gate)
                 {
                     _error = error;
-                    _hasFailed = true;
                 }
             }
 
@@ -93,10 +80,12 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 lock (_gate)
                 {
-                    if (_hasFailed)
+                    var error = _error;
+                    if (error != null)
                     {
-                        current = default(TResult);
-                        _error.Throw();
+                        current = default;
+                        _collector = default;
+                        error.Throw();
                     }
                     else
                     {
@@ -104,7 +93,8 @@ namespace System.Reactive.Linq.ObservableImpl
                         {
                             if (_done)
                             {
-                                current = default(TResult);
+                                current = default;
+                                _collector = default;
                                 return false;
                             }
 
@@ -117,7 +107,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                             try
                             {
-                                _collector = _parent._getNewCollector(current);
+                                _collector = _getNewCollector(current);
                             }
                             catch
                             {

+ 4 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/ElementAt.cs

@@ -42,7 +42,10 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void OnCompleted()
             {
-                ForwardOnError(new ArgumentOutOfRangeException("index"));
+                if (_i >= 0)
+                {
+                    ForwardOnError(new ArgumentOutOfRangeException("index"));
+                }
             }
         }
     }

+ 10 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/FirstAsync.cs

@@ -21,6 +21,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
             internal sealed class _ : IdentitySink<TSource>
             {
+                bool _found;
+
                 public _(IObserver<TSource> observer)
                     : base(observer)
                 {
@@ -28,13 +30,15 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnNext(TSource value)
                 {
+                    _found = true;
                     ForwardOnNext(value);
                     ForwardOnCompleted();
                 }
 
                 public override void OnCompleted()
                 {
-                    ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
+                    if (!_found)
+                        ForwardOnError(new InvalidOperationException(Strings_Linq.NO_ELEMENTS));
                 }
             }
         }
@@ -58,6 +62,8 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 private readonly Func<TSource, bool> _predicate;
 
+                bool _found;
+
                 public _(Func<TSource, bool> predicate, IObserver<TSource> observer)
                     : base(observer)
                 {
@@ -80,6 +86,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                     if (b)
                     {
+                        _found = true;
                         ForwardOnNext(value);
                         ForwardOnCompleted();
                     }
@@ -87,7 +94,8 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnCompleted()
                 {
-                    ForwardOnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
+                    if (!_found)
+                        ForwardOnError(new InvalidOperationException(Strings_Linq.NO_MATCHING_ELEMENTS));
                 }
             }
         }

+ 0 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/ForEach.cs

@@ -20,8 +20,6 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 _onNext = onNext;
                 _done = done;
-
-                _stopped = 0;
             }
 
             public Exception Error => _exception;
@@ -72,9 +70,6 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 _onNext = onNext;
                 _done = done;
-
-                _index = 0;
-                _stopped = 0;
             }
 
             public Exception Error => _exception;

+ 54 - 47
Rx.NET/Source/src/System.Reactive/Linq/Observable/Generate.cs

@@ -28,36 +28,38 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_scheduler);
 
             internal sealed class _ : IdentitySink<TResult>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly NoTime _parent;
+                readonly Func<TState, bool> _condition;
+                readonly Func<TState, TState> _iterate;
+                readonly Func<TState, TResult> _resultSelector;
 
                 public _(NoTime parent, IObserver<TResult> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _condition = parent._condition;
+                    _iterate = parent._iterate;
+                    _resultSelector = parent._resultSelector;
+
+                    _state = parent._initialState;
+                    _first = true;
                 }
 
                 private TState _state;
                 private bool _first;
 
-                public void Run()
+                public void Run(IScheduler _scheduler)
                 {
-                    _state = _parent._initialState;
-                    _first = true;
-
-                    var longRunning = _parent._scheduler.AsLongRunning();
+                    var longRunning = _scheduler.AsLongRunning();
                     if (longRunning != null)
                     {
                         SetUpstream(longRunning.ScheduleLongRunning(this, (@this, c) => @this.Loop(c)));
                     }
                     else
                     {
-                        SetUpstream(_parent._scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
+                        SetUpstream(_scheduler.Schedule(this, (@this, a) => @this.LoopRec(a)));
                     }
                 }
 
@@ -75,14 +77,14 @@ namespace System.Reactive.Linq.ObservableImpl
                             }
                             else
                             {
-                                _state = _parent._iterate(_state);
+                                _state = _iterate(_state);
                             }
 
-                            hasResult = _parent._condition(_state);
+                            hasResult = _condition(_state);
 
                             if (hasResult)
                             {
-                                result = _parent._resultSelector(_state);
+                                result = _resultSelector(_state);
                             }
                         }
                         catch (Exception exception)
@@ -119,14 +121,14 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            _state = _parent._iterate(_state);
+                            _state = _iterate(_state);
                         }
 
-                        hasResult = _parent._condition(_state);
+                        hasResult = _condition(_state);
 
                         if (hasResult)
                         {
-                            result = _parent._resultSelector(_state);
+                            result = _resultSelector(_state);
                         }
                     }
                     catch (Exception exception)
@@ -169,31 +171,34 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);
 
             internal sealed class _ : IdentitySink<TResult>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly Absolute _parent;
+                readonly Func<TState, bool> _condition;
+                readonly Func<TState, TState> _iterate;
+                readonly Func<TState, TResult> _resultSelector;
+                readonly Func<TState, DateTimeOffset> _timeSelector;
 
                 public _(Absolute parent, IObserver<TResult> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _condition = parent._condition;
+                    _iterate = parent._iterate;
+                    _resultSelector = parent._resultSelector;
+                    _timeSelector = parent._timeSelector;
+
+                    _first = true;
                 }
 
                 private bool _first;
                 private bool _hasResult;
+
                 private TResult _result;
 
-                public void Run()
+                public void Run(IScheduler outerScheduler, TState initialState)
                 {
-                    _first = true;
-                    _hasResult = false;
-                    _result = default(TResult);
-
-                    SetUpstream(_parent._scheduler.Schedule((@this: this, _parent._initialState), (scheduler, tuple) => [email protected](scheduler, tuple._initialState)));
+                    SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState)));
                 }
 
                 private IDisposable InvokeRec(IScheduler self, TState state)
@@ -213,15 +218,15 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            state = _parent._iterate(state);
+                            state = _iterate(state);
                         }
 
-                        _hasResult = _parent._condition(state);
+                        _hasResult = _condition(state);
 
                         if (_hasResult)
                         {
-                            _result = _parent._resultSelector(state);
-                            time = _parent._timeSelector(state);
+                            _result = _resultSelector(state);
+                            time = _timeSelector(state);
                         }
                     }
                     catch (Exception exception)
@@ -262,31 +267,33 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<TResult> observer) => new _(this, observer);
 
-            protected override void Run(_ sink) => sink.Run();
+            protected override void Run(_ sink) => sink.Run(_scheduler, _initialState);
 
             internal sealed class _ : IdentitySink<TResult>
             {
-                // CONSIDER: This sink has a parent reference that can be considered for removal.
-
-                private readonly Relative _parent;
+                private readonly Func<TState, bool> _condition;
+                private readonly Func<TState, TState> _iterate;
+                private readonly Func<TState, TResult> _resultSelector;
+                private readonly Func<TState, TimeSpan> _timeSelector;
 
                 public _(Relative parent, IObserver<TResult> observer)
                     : base(observer)
                 {
-                    _parent = parent;
+                    _condition = parent._condition;
+                    _iterate = parent._iterate;
+                    _resultSelector = parent._resultSelector;
+                    _timeSelector = parent._timeSelector;
+
+                    _first = true;
                 }
 
                 private bool _first;
                 private bool _hasResult;
                 private TResult _result;
 
-                public void Run()
+                public void Run(IScheduler outerScheduler, TState initialState)
                 {
-                    _first = true;
-                    _hasResult = false;
-                    _result = default(TResult);
-
-                    SetUpstream(_parent._scheduler.Schedule((@this: this, _parent._initialState), (scheduler, tuple) => [email protected](scheduler, tuple._initialState)));
+                    SetUpstream(outerScheduler.Schedule((@this: this, initialState), (scheduler, tuple) => [email protected](scheduler, tuple.initialState)));
                 }
 
                 private IDisposable InvokeRec(IScheduler self, TState state)
@@ -306,15 +313,15 @@ namespace System.Reactive.Linq.ObservableImpl
                         }
                         else
                         {
-                            state = _parent._iterate(state);
+                            state = _iterate(state);
                         }
 
-                        _hasResult = _parent._condition(state);
+                        _hasResult = _condition(state);
 
                         if (_hasResult)
                         {
-                            _result = _parent._resultSelector(state);
-                            time = _parent._timeSelector(state);
+                            _result = _resultSelector(state);
+                            time = _timeSelector(state);
                         }
                     }
                     catch (Exception exception)

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Latest.cs

@@ -13,9 +13,9 @@ namespace System.Reactive.Linq.ObservableImpl
         {
         }
 
-        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+        protected override PushToPullSink<TSource, TSource> Run()
         {
-            return new _(subscription);
+            return new _();
         }
 
         private sealed class _ : PushToPullSink<TSource, TSource>
@@ -23,8 +23,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly object _gate;
             private readonly SemaphoreSlim _semaphore;
 
-            public _(IDisposable subscription)
-                : base(subscription)
+            public _()
             {
                 _gate = new object();
                 _semaphore = new SemaphoreSlim(0, 1);

+ 0 - 18
Rx.NET/Source/src/System.Reactive/Linq/Observable/Max.cs

@@ -40,8 +40,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public NonNull(IComparer<TSource> comparer, IObserver<TSource> observer)
                 : base(comparer, observer)
             {
-                _hasValue = false;
-                _lastValue = default(TSource);
             }
 
             public override void OnNext(TSource value)
@@ -93,7 +91,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public Null(IComparer<TSource> comparer, IObserver<TSource> observer)
                 : base(comparer, observer)
             {
-                _lastValue = default(TSource);
             }
 
             public override void OnNext(TSource value)
@@ -160,8 +157,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<double> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(double);
             }
 
             public override void OnNext(double value)
@@ -216,8 +211,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<float> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(float);
             }
 
             public override void OnNext(float value)
@@ -272,8 +265,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<decimal> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(decimal);
             }
 
             public override void OnNext(decimal value)
@@ -328,8 +319,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<int> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(int);
             }
 
             public override void OnNext(int value)
@@ -384,8 +373,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<long> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(long);
             }
 
             public override void OnNext(long value)
@@ -439,7 +426,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<double?> observer)
                 : base(observer)
             {
-                _lastValue = default(double?);
             }
 
             public override void OnNext(double? value)
@@ -488,7 +474,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<float?> observer)
                 : base(observer)
             {
-                _lastValue = default(float?);
             }
 
             public override void OnNext(float? value)
@@ -537,7 +522,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<decimal?> observer)
                 : base(observer)
             {
-                _lastValue = default(decimal?);
             }
 
             public override void OnNext(decimal? value)
@@ -586,7 +570,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<int?> observer)
                 : base(observer)
             {
-                _lastValue = default(int?);
             }
 
             public override void OnNext(int? value)
@@ -635,7 +618,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<long?> observer)
                 : base(observer)
             {
-                _lastValue = default(long?);
             }
 
             public override void OnNext(long? value)

+ 0 - 18
Rx.NET/Source/src/System.Reactive/Linq/Observable/Min.cs

@@ -40,8 +40,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public NonNull(IComparer<TSource> comparer, IObserver<TSource> observer)
                 : base(comparer, observer)
             {
-                _hasValue = false;
-                _lastValue = default(TSource);
             }
 
             public override void OnNext(TSource value)
@@ -98,7 +96,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public Null(IComparer<TSource> comparer, IObserver<TSource> observer)
                 : base(comparer, observer)
             {
-                _lastValue = default(TSource);
             }
 
             public override void OnNext(TSource value)
@@ -160,8 +157,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<double> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(double);
             }
 
             public override void OnNext(double value)
@@ -216,8 +211,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<float> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(float);
             }
 
             public override void OnNext(float value)
@@ -272,8 +265,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<decimal> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(decimal);
             }
 
             public override void OnNext(decimal value)
@@ -328,8 +319,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<int> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(int);
             }
 
             public override void OnNext(int value)
@@ -384,8 +373,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<long> observer)
                 : base(observer)
             {
-                _hasValue = false;
-                _lastValue = default(long);
             }
 
             public override void OnNext(long value)
@@ -439,7 +426,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<double?> observer)
                 : base(observer)
             {
-                _lastValue = default(double?);
             }
 
             public override void OnNext(double? value)
@@ -488,7 +474,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<float?> observer)
                 : base(observer)
             {
-                _lastValue = default(float?);
             }
 
             public override void OnNext(float? value)
@@ -537,7 +522,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<decimal?> observer)
                 : base(observer)
             {
-                _lastValue = default(decimal?);
             }
 
             public override void OnNext(decimal? value)
@@ -586,7 +570,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<int?> observer)
                 : base(observer)
             {
-                _lastValue = default(int?);
             }
 
             public override void OnNext(int? value)
@@ -635,7 +618,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<long?> observer)
                 : base(observer)
             {
-                _lastValue = default(long?);
             }
 
             public override void OnNext(long? value)

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/MostRecent.cs

@@ -14,15 +14,14 @@ namespace System.Reactive.Linq.ObservableImpl
             _initialValue = initialValue;
         }
 
-        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+        protected override PushToPullSink<TSource, TSource> Run()
         {
-            return new _(_initialValue, subscription);
+            return new _(_initialValue);
         }
 
         private sealed class _ : PushToPullSink<TSource, TSource>
         {
-            public _(TSource initialValue, IDisposable subscription)
-                : base(subscription)
+            public _(TSource initialValue)
             {
                 _kind = NotificationKind.OnNext;
                 _value = initialValue;

+ 3 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Next.cs

@@ -13,9 +13,9 @@ namespace System.Reactive.Linq.ObservableImpl
         {
         }
 
-        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
+        protected override PushToPullSink<TSource, TSource> Run()
         {
-            return new _(subscription);
+            return new _();
         }
 
         private sealed class _ : PushToPullSink<TSource, TSource>
@@ -23,8 +23,7 @@ namespace System.Reactive.Linq.ObservableImpl
             private readonly object _gate;
             private readonly SemaphoreSlim _semaphore;
 
-            public _(IDisposable subscription)
-                : base(subscription)
+            public _()
             {
                 _gate = new object();
                 _semaphore = new SemaphoreSlim(0, 1);

+ 11 - 12
Rx.NET/Source/src/System.Reactive/Linq/Observable/PushToPullAdapter.cs

@@ -21,23 +21,17 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public IEnumerator<TResult> GetEnumerator()
         {
-            var d = new SingleAssignmentDisposable();
-            var res = Run(d);
-            d.Disposable = _source.SubscribeSafe(res);
+            var res = Run();
+            res.SetUpstream(_source.SubscribeSafe(res));
             return res;
         }
 
-        protected abstract PushToPullSink<TSource, TResult> Run(IDisposable subscription);
+        protected abstract PushToPullSink<TSource, TResult> Run();
     }
 
     internal abstract class PushToPullSink<TSource, TResult> : IObserver<TSource>, IEnumerator<TResult>, IDisposable
     {
-        private readonly IDisposable _subscription;
-
-        public PushToPullSink(IDisposable subscription)
-        {
-            _subscription = subscription;
-        }
+        private IDisposable _upstream;
 
         public abstract void OnNext(TSource value);
         public abstract void OnError(Exception error);
@@ -59,7 +53,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 else
                 {
                     _done = true;
-                    _subscription.Dispose();
+                    Dispose();
                 }
             }
 
@@ -81,7 +75,12 @@ namespace System.Reactive.Linq.ObservableImpl
 
         public void Dispose()
         {
-            _subscription.Dispose();
+            Disposable.TryDispose(ref _upstream);
+        }
+
+        public void SetUpstream(IDisposable d)
+        {
+            Disposable.SetSingle(ref _upstream, d);
         }
     }
 }

+ 76 - 30
Rx.NET/Source/src/System.Reactive/Linq/Observable/Range.cs

@@ -7,71 +7,117 @@ using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
-    internal sealed class Range : Producer<int, Range._>
+    internal sealed class RangeRecursive : Producer<int, RangeRecursive.RangeSink>
     {
         private readonly int _start;
         private readonly int _count;
         private readonly IScheduler _scheduler;
 
-        public Range(int start, int count, IScheduler scheduler)
+        public RangeRecursive(int start, int count, IScheduler scheduler)
         {
             _start = start;
             _count = count;
             _scheduler = scheduler;
         }
 
-        protected override _ CreateSink(IObserver<int> observer) => new _(this, observer);
+        protected override RangeSink CreateSink(IObserver<int> observer) => new RangeSink(_start, _count, observer);
 
-        protected override void Run(_ sink) => sink.Run(_scheduler);
+        protected override void Run(RangeSink sink) => sink.Run(_scheduler);
 
-        internal sealed class _ : IdentitySink<int>
+        internal sealed class RangeSink : IdentitySink<int>
         {
-            private readonly int _start;
-            private readonly int _count;
+            readonly int _end;
 
-            public _(Range parent, IObserver<int> observer)
+            int _index;
+
+            IDisposable _task;
+
+            public RangeSink(int start, int count, IObserver<int> observer)
                 : base(observer)
             {
-                _start = parent._start;
-                _count = parent._count;
+                _index = start;
+                _end = start + count;
             }
 
             public void Run(IScheduler scheduler)
             {
-                var longRunning = scheduler.AsLongRunning();
-                if (longRunning != null)
+                var first = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                Disposable.TrySetSingle(ref _task, first);
+            }
+
+            protected override void Dispose(bool disposing)
+            {
+                base.Dispose(disposing);
+                if (disposing)
                 {
-                    SetUpstream(longRunning.ScheduleLongRunning(0, Loop));
+                    Disposable.TryDispose(ref _task);
                 }
-                else
+            }   
+
+            private IDisposable LoopRec(IScheduler scheduler)
+            {
+                var idx = _index;
+                if (idx != _end)
+                {
+                    _index = idx + 1;
+                    ForwardOnNext(idx);
+                    var next = scheduler.Schedule(this, (innerScheduler, @this) => @this.LoopRec(innerScheduler));
+                    Disposable.TrySetMultiple(ref _task, next);
+                } else
                 {
-                    SetUpstream(scheduler.Schedule(0, LoopRec));
+                    ForwardOnCompleted();
                 }
+                return Disposable.Empty;
             }
+        }
+    }
+
+    internal sealed class RangeLongRunning : Producer<int, RangeLongRunning.RangeSink>
+    {
+        private readonly int _start;
+        private readonly int _count;
+        private readonly ISchedulerLongRunning _scheduler;
 
-            private void Loop(int i, ICancelable cancel)
+        public RangeLongRunning(int start, int count, ISchedulerLongRunning scheduler)
+        {
+            _start = start;
+            _count = count;
+            _scheduler = scheduler;
+        }
+
+        protected override RangeSink CreateSink(IObserver<int> observer) => new RangeSink(_start, _count, observer);
+
+        protected override void Run(RangeSink sink) => sink.Run(_scheduler);
+
+        internal sealed class RangeSink : IdentitySink<int>
+        {
+            readonly int _end;
+
+            int _index;
+
+            public RangeSink(int start, int count, IObserver<int> observer)
+                : base(observer)
             {
-                while (!cancel.IsDisposed && i < _count)
-                {
-                    ForwardOnNext(_start + i);
-                    i++;
-                }
+                _index = start;
+                _end = start + count;
+            }
 
-                if (!cancel.IsDisposed)
-                    ForwardOnCompleted();
+            public void Run(ISchedulerLongRunning scheduler)
+            {
+                SetUpstream(scheduler.ScheduleLongRunning(this, (@this, cancel) => @this.Loop(cancel)));
             }
 
-            private void LoopRec(int i, Action<int> recurse)
+            private void Loop(ICancelable cancel)
             {
-                if (i < _count)
+                var idx = _index;
+                var end = _end;
+                while (!cancel.IsDisposed && idx != end)
                 {
-                    ForwardOnNext(_start + i);
-                    recurse(i + 1);
+                    ForwardOnNext(idx++);
                 }
-                else
-                {
+
+                if (!cancel.IsDisposed)
                     ForwardOnCompleted();
-                }
             }
         }
     }

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/Scan.cs

@@ -75,8 +75,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 : base(observer)
             {
                 _accumulator = accumulator;
-                _accumulation = default(TSource);
-                _hasAccumulation = false;
             }
 
             public override void OnNext(TSource value)

+ 0 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Select.cs

@@ -73,7 +73,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     : base(observer)
                 {
                     _selector = selector;
-                    _index = 0;
                 }
 
                 public override void OnNext(TSource value)

+ 6 - 9
Rx.NET/Source/src/System.Reactive/Linq/Observable/SequenceEqual.cs

@@ -30,29 +30,26 @@ namespace System.Reactive.Linq.ObservableImpl
             internal sealed class _ : IdentitySink<bool>
             {
                 private readonly IEqualityComparer<TSource> _comparer;
+                private readonly object _gate;
+                private readonly Queue<TSource> _ql;
+                private readonly Queue<TSource> _qr;
 
                 public _(IEqualityComparer<TSource> comparer, IObserver<bool> observer)
                     : base(observer)
                 {
                     _comparer = comparer;
+                    _gate = new object();
+                    _ql = new Queue<TSource>();
+                    _qr = new Queue<TSource>();
                 }
 
-                private object _gate;
                 private bool _donel;
                 private bool _doner;
-                private Queue<TSource> _ql;
-                private Queue<TSource> _qr;
 
                 private IDisposable _second;
 
                 public void Run(Observable parent)
                 {
-                    _gate = new object();
-                    _donel = false;
-                    _doner = false;
-                    _ql = new Queue<TSource>();
-                    _qr = new Queue<TSource>();
-
                     SetUpstream(parent._first.SubscribeSafe(new FirstObserver(this)));
                     Disposable.SetSingle(ref _second, parent._second.SubscribeSafe(new SecondObserver(this)));
                 }

+ 0 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleAsync.cs

@@ -27,8 +27,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 public _(IObserver<TSource> observer)
                     : base(observer)
                 {
-                    _value = default(TSource);
-                    _seenValue = false;
                 }
 
                 public override void OnNext(TSource value)
@@ -83,9 +81,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     : base(observer)
                 {
                     _predicate = predicate;
-
-                    _value = default(TSource);
-                    _seenValue = false;
                 }
 
                 public override void OnNext(TSource value)

+ 0 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/SingleOrDefaultAsync.cs

@@ -27,8 +27,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 public _(IObserver<TSource> observer)
                     : base(observer)
                 {
-                    _value = default(TSource);
-                    _seenValue = false;
                 }
 
                 public override void OnNext(TSource value)
@@ -76,9 +74,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     : base(observer)
                 {
                     _predicate = predicate;
-
-                    _value = default(TSource);
-                    _seenValue = false;
                 }
 
                 public override void OnNext(TSource value)

+ 0 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipUntil.cs

@@ -114,11 +114,6 @@ namespace System.Reactive.Linq.ObservableImpl
         }
     }
 
-    internal static class SkipUntilTerminalException
-    {
-        internal static readonly Exception Instance = new Exception("No further exceptions");
-    }
-
     internal sealed class SkipUntil<TSource> : Producer<TSource, SkipUntil<TSource>._>
     {
         private readonly IObservable<TSource> _source;

+ 0 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/SkipWhile.cs

@@ -85,7 +85,6 @@ namespace System.Reactive.Linq.ObservableImpl
                     : base(observer)
                 {
                     _predicate = predicate;
-                    _index = 0;
                 }
 
                 public override void OnNext(TSource value)

+ 2 - 12
Rx.NET/Source/src/System.Reactive/Linq/Observable/Sum.cs

@@ -24,7 +24,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<double> observer)
                 : base(observer)
             {
-                _sum = 0.0;
             }
 
             public override void OnNext(double value)
@@ -55,12 +54,11 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<float>
         {
-            private double _sum; // This is what LINQ to Objects does!
+            private double _sum; // This is what LINQ to Objects does (accumulates into double that is)!
 
             public _(IObserver<float> observer)
                 : base(observer)
             {
-                _sum = 0.0; // This is what LINQ to Objects does!
             }
 
             public override void OnNext(float value)
@@ -96,7 +94,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<decimal> observer)
                 : base(observer)
             {
-                _sum = 0M;
             }
 
             public override void OnNext(decimal value)
@@ -132,7 +129,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<int> observer)
                 : base(observer)
             {
-                _sum = 0;
             }
 
             public override void OnNext(int value)
@@ -178,7 +174,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<long> observer)
                 : base(observer)
             {
-                _sum = 0L;
             }
 
             public override void OnNext(long value)
@@ -224,7 +219,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<double?> observer)
                 : base(observer)
             {
-                _sum = 0.0;
             }
 
             public override void OnNext(double? value)
@@ -256,12 +250,11 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : IdentitySink<float?>
         {
-            private double _sum; // This is what LINQ to Objects does!
+            private double _sum; // This is what LINQ to Objects does (accumulates into double that is)!
 
             public _(IObserver<float?> observer)
                 : base(observer)
             {
-                _sum = 0.0; // This is what LINQ to Objects does!
             }
 
             public override void OnNext(float? value)
@@ -298,7 +291,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<decimal?> observer)
                 : base(observer)
             {
-                _sum = 0M;
             }
 
             public override void OnNext(decimal? value)
@@ -335,7 +327,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<int?> observer)
                 : base(observer)
             {
-                _sum = 0;
             }
 
             public override void OnNext(int? value)
@@ -382,7 +373,6 @@ namespace System.Reactive.Linq.ObservableImpl
             public _(IObserver<long?> observer)
                 : base(observer)
             {
-                _sum = 0L;
             }
 
             public override void OnNext(long? value)

+ 4 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeLastBuffer.cs

@@ -70,7 +70,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
             protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(_duration, observer);
 
-            protected override void Run(_ sink) => sink.Run(this);
+            protected override void Run(_ sink) => sink.Run(_source, _scheduler);
 
             internal sealed class _ : Sink<TSource, IList<TSource>> 
             {
@@ -86,11 +86,11 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 private IStopwatch _watch;
 
-                public void Run(Time parent)
+                public void Run(IObservable<TSource> source, IScheduler scheduler)
                 {
-                    _watch = parent._scheduler.StartStopwatch();
+                    _watch = scheduler.StartStopwatch();
 
-                    SetUpstream(parent._source.SubscribeSafe(this));
+                    base.Run(source);
                 }
 
                 public override void OnNext(TSource value)

+ 0 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/TakeWhile.cs

@@ -86,7 +86,6 @@ namespace System.Reactive.Linq.ObservableImpl
                 {
                     _predicate = predicate;
                     _running = true;
-                    _index = 0;
                 }
 
                 public override void OnNext(TSource value)

+ 10 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToArray.cs

@@ -21,7 +21,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : Sink<TSource, TSource[]> 
         {
-            private readonly List<TSource> _list;
+            private List<TSource> _list;
 
             public _(IObserver<TSource[]> observer)
                 : base(observer)
@@ -34,9 +34,17 @@ namespace System.Reactive.Linq.ObservableImpl
                 _list.Add(value);
             }
 
+            public override void OnError(Exception error)
+            {
+                _list = null;
+                base.OnError(error);
+            }
+
             public override void OnCompleted()
             {
-                ForwardOnNext(_list.ToArray());
+                var list = _list;
+                _list = null;
+                ForwardOnNext(list.ToArray());
                 ForwardOnCompleted();
             }
         }

+ 11 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToDictionary.cs

@@ -29,7 +29,7 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly Func<TSource, TKey> _keySelector;
             private readonly Func<TSource, TElement> _elementSelector;
-            private readonly Dictionary<TKey, TElement> _dictionary;
+            private Dictionary<TKey, TElement> _dictionary;
 
             public _(ToDictionary<TSource, TKey, TElement> parent, IObserver<IDictionary<TKey, TElement>> observer)
                 : base(observer)
@@ -47,13 +47,22 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
                 catch (Exception ex)
                 {
+                    _dictionary = null;
                     ForwardOnError(ex);
                 }
             }
 
+            public override void OnError(Exception error)
+            {
+                _dictionary = null;
+                ForwardOnError(error);
+            }
+
             public override void OnCompleted()
             {
-                ForwardOnNext(_dictionary);
+                var dictionary = _dictionary;
+                _dictionary = null;
+                ForwardOnNext(dictionary);
                 ForwardOnCompleted();
             }
         }

+ 10 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToList.cs

@@ -21,7 +21,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
         internal sealed class _ : Sink<TSource, IList<TSource>> 
         {
-            private readonly List<TSource> _list;
+            private List<TSource> _list;
 
             public _(IObserver<IList<TSource>> observer)
                 : base(observer)
@@ -34,9 +34,17 @@ namespace System.Reactive.Linq.ObservableImpl
                 _list.Add(value);
             }
 
+            public override void OnError(Exception error)
+            {
+                _list = null;
+                ForwardOnError(error);
+            }
+
             public override void OnCompleted()
             {
-                ForwardOnNext(_list);
+                var list = _list;
+                _list = null;
+                ForwardOnNext(list);
                 ForwardOnCompleted();
             }
         }

+ 11 - 2
Rx.NET/Source/src/System.Reactive/Linq/Observable/ToLookup.cs

@@ -30,7 +30,7 @@ namespace System.Reactive.Linq.ObservableImpl
         {
             private readonly Func<TSource, TKey> _keySelector;
             private readonly Func<TSource, TElement> _elementSelector;
-            private readonly Lookup<TKey, TElement> _lookup;
+            private Lookup<TKey, TElement> _lookup;
 
             public _(ToLookup<TSource, TKey, TElement> parent, IObserver<ILookup<TKey, TElement>> observer)
                 : base(observer)
@@ -48,13 +48,22 @@ namespace System.Reactive.Linq.ObservableImpl
                 }
                 catch (Exception ex)
                 {
+                    _lookup = null;
                     ForwardOnError(ex);
                 }
             }
 
+            public override void OnError(Exception error)
+            {
+                _lookup = null;
+                ForwardOnError(error);
+            }
+
             public override void OnCompleted()
             {
-                ForwardOnNext(_lookup);
+                var lookup = _lookup;
+                _lookup = null;
+                ForwardOnNext(lookup);
                 ForwardOnCompleted();
             }
         }

+ 2 - 3
Rx.NET/Source/src/System.Reactive/Linq/Observable/Where.cs

@@ -38,7 +38,7 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void OnNext(TSource value)
                 {
-                    var shouldRun = default(bool);
+                    var shouldRun = false;
                     try
                     {
                         shouldRun = _predicate(value);
@@ -81,12 +81,11 @@ namespace System.Reactive.Linq.ObservableImpl
                     : base(observer)
                 {
                     _predicate = predicate;
-                    _index = 0;
                 }
 
                 public override void OnNext(TSource value)
                 {
-                    var shouldRun = default(bool);
+                    var shouldRun = false;
                     try
                     {
                         shouldRun = _predicate(value, checked(_index++));

+ 0 - 4
Rx.NET/Source/src/System.Reactive/Linq/Observable/Window.cs

@@ -50,8 +50,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void Run(IObservable<TSource> source)
                 {
-                    _n = 0;
-
                     var firstWindow = CreateWindow();
                     ForwardOnNext(firstWindow);
 
@@ -388,8 +386,6 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 public override void Run(IObservable<TSource> source)
                 {
-                    _n = 0;
-
                     var groupDisposable = new CompositeDisposable(2) { _timerD };
                     _refCountDisposable = new RefCountDisposable(groupDisposable);
 

+ 5 - 8
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.cs

@@ -551,21 +551,18 @@ namespace System.Reactive.Linq.ObservableImpl
             _sources = sources;
         }
 
-        protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(this, observer);
+        protected override _ CreateSink(IObserver<IList<TSource>> observer) => new _(observer);
 
-        protected override void Run(_ sink) => sink.Run();
+        protected override void Run(_ sink) => sink.Run(_sources);
 
         internal sealed class _ : IdentitySink<IList<TSource>>
         {
-            private readonly Zip<TSource> _parent;
-
             private readonly object _gate;
 
-            public _(Zip<TSource> parent, IObserver<IList<TSource>> observer)
+            public _(IObserver<IList<TSource>> observer)
                 : base(observer)
             {
                 _gate = new object();
-                _parent = parent;
             }
 
             private Queue<TSource>[] _queues;
@@ -574,9 +571,9 @@ namespace System.Reactive.Linq.ObservableImpl
 
             private static readonly IDisposable[] Disposed = new IDisposable[0];
 
-            public void Run()
+            public void Run(IEnumerable<IObservable<TSource>> sources)
             {
-                var srcs = _parent._sources.ToArray();
+                var srcs = sources.ToArray();
 
                 var N = srcs.Length;
 

+ 6 - 1
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -338,7 +338,12 @@ namespace System.Reactive.Linq
 
         private static IObservable<int> Range_(int start, int count, IScheduler scheduler)
         {
-            return new Range(start, count, scheduler);
+            var longRunning = scheduler.AsLongRunning();
+            if (longRunning != null)
+            {
+                return new RangeLongRunning(start, count, longRunning);
+            }
+            return new RangeRecursive(start, count, scheduler);
         }
 
         #endregion