Parcourir la source

Merge branch 'master' into OneShotScheduleOperators

David Karnok il y a 7 ans
Parent
commit
7de5f747c1

+ 2 - 2
Rx.NET/Source/System.Reactive.sln

@@ -342,8 +342,8 @@ Global
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Debug|x64.Build.0 = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Debug|x86.ActiveCfg = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Debug|x86.Build.0 = Rx.net 4.0|Any CPU
-		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.ActiveCfg = Rx.net 4.0|Any CPU
-		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.Build.0 = Rx.net 4.0|Any CPU
+		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.ActiveCfg = Current Sources|Any CPU
+		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|Any CPU.Build.0 = Current Sources|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|ARM.ActiveCfg = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|ARM.Build.0 = Rx.net 4.0|Any CPU
 		{5C7906F6-232E-455C-9269-68EF84F393C9}.Release|x64.ActiveCfg = Rx.net 4.0|Any CPU

+ 2 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/Program.cs

@@ -25,6 +25,8 @@ namespace Benchmarks.System.Reactive
                 typeof(ComparisonBenchmark),
                 typeof(ComparisonAsyncBenchmark),
                 typeof(ScalarScheduleBenchmark)
+                typeof(StableCompositeDisposableBenchmark),
+                typeof(ComparisonAsyncBenchmark)
 #if (CURRENT)
                 ,typeof(AppendPrependBenchmark)
 #endif

+ 109 - 0
Rx.NET/Source/benchmarks/Benchmarks.System.Reactive/StableCompositeDisposableBenchmark.cs

@@ -0,0 +1,109 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the Apache 2.0 License.
+// See the LICENSE file in the project root for more information.
+
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Reactive.Disposables;
+using System.Reactive.Linq;
+using System.Threading;
+using BenchmarkDotNet.Attributes;
+
+namespace Benchmarks.System.Reactive
+{
+    [MemoryDiagnoser]
+    public class StableCompositeDisposableBenchmark
+    {
+        [Params(3, 4, 5, 6, 7, 8, 9, 10, 100)]
+        public int N;
+
+        private IDisposable[] _array;
+
+        private List<IDisposable> _list;
+
+        [GlobalSetup]
+        public void Setup()
+        {
+            _array = new IDisposable[N];
+            for (var i = 0; i < N; i++)
+            {
+                _array[i] = Disposable.Empty;
+            }
+            _list = new List<IDisposable>(_array);
+        }
+
+        [Benchmark]
+        public object Create_Array()
+        {
+            return StableCompositeDisposable.Create(_array);
+        }
+
+        [Benchmark]
+        public object Create_Trusted_Array()
+        {
+            return CreateTrusted(_array);
+        }
+
+        [Benchmark]
+        public object Create_List()
+        {
+            return StableCompositeDisposable.Create(_list);
+        }
+
+        [Benchmark]
+        public object Dispose_Array()
+        {
+            var scd = StableCompositeDisposable.Create(_array);
+            scd.Dispose();
+            return scd;
+        }
+
+        [Benchmark]
+        public object Dispose_List()
+        {
+            var scd = StableCompositeDisposable.Create(_list);
+            scd.Dispose();
+            return scd;
+        }
+
+        [Benchmark]
+        public object Dispose_Trused_Array()
+        {
+            var scd = CreateTrusted(_array);
+            scd.Dispose();
+            return scd;
+        }
+
+        // The StableCompositeDisposable.CreateTrusted is inaccessible and
+        // adding the InternalsVisibleTo attribute doesn't work (needs signed assemblies?)
+        internal static ICancelable CreateTrusted(params IDisposable[] disposables)
+        {
+            return new NAryTrustedArray(disposables);
+        }
+
+        private sealed class NAryTrustedArray : StableCompositeDisposable
+        {
+            private IDisposable[] _disposables;
+
+            public NAryTrustedArray(IDisposable[] disposables)
+            {
+                Volatile.Write(ref _disposables, disposables);
+            }
+
+            public override bool IsDisposed => Volatile.Read(ref _disposables) == null;
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
+    }
+}

+ 77 - 9
Rx.NET/Source/src/System.Reactive/Disposables/StableCompositeDisposable.cs

@@ -45,7 +45,19 @@ namespace System.Reactive.Disposables
                 throw new ArgumentNullException(nameof(disposables));
             }
 
-            return new NAry(disposables);
+            return new NAryArray(disposables);
+        }
+
+        /// <summary>
+        /// Creates a group of disposable resources that are disposed together
+        /// and without copying or checking for nulls inside the group.
+        /// </summary>
+        /// <param name="disposables">The array of disposables that is trusted
+        /// to not contain nulls and gives no need to defensively copy it.</param>
+        /// <returns>Group of disposable resources that are disposed together.</returns>
+        internal static ICancelable CreateTrusted(params IDisposable[] disposables)
+        {
+            return new NAryTrustedArray(disposables);
         }
 
         /// <summary>
@@ -60,7 +72,7 @@ namespace System.Reactive.Disposables
                 throw new ArgumentNullException(nameof(disposables));
             }
 
-            return new NAry(disposables);
+            return new NAryEnumerable(disposables);
         }
 
         /// <summary>
@@ -96,16 +108,11 @@ namespace System.Reactive.Disposables
             }
         }
 
-        private sealed class NAry : StableCompositeDisposable
+        private sealed class NAryEnumerable : StableCompositeDisposable
         {
             private volatile List<IDisposable> _disposables;
 
-            public NAry(IDisposable[] disposables)
-                : this((IEnumerable<IDisposable>)disposables)
-            {
-            }
-
-            public NAry(IEnumerable<IDisposable> disposables)
+            public NAryEnumerable(IEnumerable<IDisposable> disposables)
             {
                 _disposables = new List<IDisposable>(disposables);
 
@@ -132,5 +139,66 @@ namespace System.Reactive.Disposables
                 }
             }
         }
+
+        private sealed class NAryArray : StableCompositeDisposable
+        {
+            private IDisposable[] _disposables;
+
+            public NAryArray(IDisposable[] disposables)
+            {
+                var n = disposables.Length;
+                var ds = new IDisposable[n];
+                // These are likely already vectorized in the framework
+                // At least they are faster than loop-copying
+                Array.Copy(disposables, 0, ds, 0, n);
+                if (Array.IndexOf(ds, null) != -1)
+                {
+                    throw new ArgumentException(Strings_Core.DISPOSABLES_CANT_CONTAIN_NULL, nameof(disposables));
+                }
+                Volatile.Write(ref _disposables, ds);
+            }
+
+            public override bool IsDisposed => Volatile.Read(ref _disposables) == null;
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
+
+        /// <summary>
+        /// A stable composite that doesn't do defensive copy of
+        /// the input disposable array nor checks it for null.
+        /// </summary>
+        private sealed class NAryTrustedArray : StableCompositeDisposable
+        {
+            private IDisposable[] _disposables;
+
+            public NAryTrustedArray(IDisposable[] disposables)
+            {
+                Volatile.Write(ref _disposables, disposables);
+            }
+
+            public override bool IsDisposed => Volatile.Read(ref _disposables) == null;
+
+            public override void Dispose()
+            {
+                var old = Interlocked.Exchange(ref _disposables, null);
+                if (old != null)
+                {
+                    foreach (var d in old)
+                    {
+                        d.Dispose();
+                    }
+                }
+            }
+        }
     }
 }

+ 14 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.cs

@@ -57,7 +57,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer2.SetResource(source2.SubscribeSafe(_observer2));
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value);
@@ -114,7 +114,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value);
@@ -176,7 +176,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value);
@@ -243,7 +243,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value);
@@ -315,7 +315,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value);
@@ -392,7 +392,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value);
@@ -474,7 +474,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value);
@@ -561,7 +561,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value);
@@ -653,7 +653,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value);
@@ -750,7 +750,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value);
@@ -852,7 +852,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value);
@@ -959,7 +959,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value);
@@ -1071,7 +1071,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value, _observer15.Value);
@@ -1188,7 +1188,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
                 _observer16.SetResource(source16.SubscribeSafe(_observer16));
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Value, _observer2.Value, _observer3.Value, _observer4.Value, _observer5.Value, _observer6.Value, _observer7.Value, _observer8.Value, _observer9.Value, _observer10.Value, _observer11.Value, _observer12.Value, _observer13.Value, _observer14.Value, _observer15.Value, _observer16.Value);

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.Generated.tt

@@ -96,7 +96,7 @@ for (var j = 1; j <= i; j++)
 }
 #>
 
-                SetUpstream(StableCompositeDisposable.Create(subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(subscriptions));
             }
 
             protected override TResult GetResult() => _resultSelector(<#=vs#>);

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/CombineLatest.cs

@@ -438,7 +438,7 @@ namespace System.Reactive.Linq.ObservableImpl
                     d.Disposable = srcs[j].SubscribeSafe(o);
                 }
 
-                SetUpstream(StableCompositeDisposable.Create(_subscriptions));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(_subscriptions));
             }
 
             private void OnNext(int index, TSource value)

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Latest.cs

@@ -116,7 +116,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 switch (kind)
                 {
                     case NotificationKind.OnNext:
-                        current = _value;
+                        current = value;
                         return true;
                     case NotificationKind.OnError:
                         error.Throw();

+ 14 - 14
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.cs

@@ -65,7 +65,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer2.SetResource(source2.SubscribeSafe(_observer2));
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue());
@@ -133,7 +133,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer3.SetResource(source3.SubscribeSafe(_observer3));
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue());
@@ -209,7 +209,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer4.SetResource(source4.SubscribeSafe(_observer4));
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue());
@@ -293,7 +293,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer5.SetResource(source5.SubscribeSafe(_observer5));
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue());
@@ -385,7 +385,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer6.SetResource(source6.SubscribeSafe(_observer6));
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue());
@@ -485,7 +485,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer7.SetResource(source7.SubscribeSafe(_observer7));
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue());
@@ -593,7 +593,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer8.SetResource(source8.SubscribeSafe(_observer8));
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue());
@@ -709,7 +709,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer9.SetResource(source9.SubscribeSafe(_observer9));
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue());
@@ -833,7 +833,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer10.SetResource(source10.SubscribeSafe(_observer10));
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue());
@@ -965,7 +965,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer11.SetResource(source11.SubscribeSafe(_observer11));
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue());
@@ -1105,7 +1105,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer12.SetResource(source12.SubscribeSafe(_observer12));
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue());
@@ -1253,7 +1253,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer13.SetResource(source13.SubscribeSafe(_observer13));
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue(), _observer14.Values.Dequeue());
@@ -1409,7 +1409,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer14.SetResource(source14.SubscribeSafe(_observer14));
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue(), _observer14.Values.Dequeue(), _observer15.Values.Dequeue());
@@ -1573,7 +1573,7 @@ namespace System.Reactive.Linq.ObservableImpl
                 _observer15.SetResource(source15.SubscribeSafe(_observer15));
                 _observer16.SetResource(source16.SubscribeSafe(_observer16));
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(_observer1.Values.Dequeue(), _observer2.Values.Dequeue(), _observer3.Values.Dequeue(), _observer4.Values.Dequeue(), _observer5.Values.Dequeue(), _observer6.Values.Dequeue(), _observer7.Values.Dequeue(), _observer8.Values.Dequeue(), _observer9.Values.Dequeue(), _observer10.Values.Dequeue(), _observer11.Values.Dequeue(), _observer12.Values.Dequeue(), _observer13.Values.Dequeue(), _observer14.Values.Dequeue(), _observer15.Values.Dequeue(), _observer16.Values.Dequeue());

+ 1 - 1
Rx.NET/Source/src/System.Reactive/Linq/Observable/Zip.Generated.tt

@@ -97,7 +97,7 @@ for (var j = 1; j <= i; j++)
 }
 #>
 
-                SetUpstream(StableCompositeDisposable.Create(disposables));
+                SetUpstream(StableCompositeDisposable.CreateTrusted(disposables));
             }
 
             protected override TResult GetResult() => _resultSelector(<#=vs#>);

+ 0 - 3
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -101,7 +101,6 @@ namespace System.Reactive.Linq
                 public Subscription(Func<IObserver<TResult>, CancellationToken, Task> subscribeAsync, IObserver<TResult> observer)
                 {
                     _subscription = subscribeAsync(observer, _cts.Token)
-                        .ToObservable()
                         .Subscribe(new TaskCompletionObserver(observer));
                 }
 
@@ -180,7 +179,6 @@ namespace System.Reactive.Linq
                     // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
                     //
                     subscribeAsync(observer, _cts.Token)
-                        .ToObservable()
                         .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 
@@ -262,7 +260,6 @@ namespace System.Reactive.Linq
                     // Notice because we're using the AnonymousObservable<T> type, we get auto-detach behavior for free.
                     //
                     subscribeAsync(observer, _cts.Token)
-                        .ToObservable()
                         .Subscribe(_observer = new TaskDisposeCompletionObserver(observer));
                 }
 

+ 2 - 2
Rx.NET/Source/src/System.Reactive/Platforms/UWP/Linq/AsyncInfoObservable.cs

@@ -95,7 +95,7 @@ namespace System.Reactive.Linq
                     var dataSubscription = data.DefaultIfEmpty().Subscribe(obs);
                     var connection = data.Connect();
 
-                    return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
+                    return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
                 }).ToTask(ct);
             });
         }
@@ -218,7 +218,7 @@ namespace System.Reactive.Linq
                     var dataSubscription = resultSelector(data).Subscribe(obs);
                     var connection = data.Connect();
 
-                    return StableCompositeDisposable.Create(progressSubscription, dataSubscription, connection);
+                    return StableCompositeDisposable.CreateTrusted(progressSubscription, dataSubscription, connection);
                 }).ToTask(ct);
             });
         }

+ 56 - 51
Rx.NET/Source/src/System.Reactive/Threading/Tasks/TaskObservableExtensions.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Concurrency;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
 using System.Reactive.Linq.ObservableImpl;
 using System.Reactive.Subjects;
@@ -58,48 +59,30 @@ namespace System.Reactive.Threading.Tasks
 
         private static IObservable<Unit> ToObservableImpl(Task task, IScheduler scheduler)
         {
-            var res = default(IObservable<Unit>);
-
             if (task.IsCompleted)
             {
                 scheduler = scheduler ?? ImmediateScheduler.Instance;
 
                 switch (task.Status)
                 {
-                    case TaskStatus.RanToCompletion:
-                        res = new Return<Unit>(Unit.Default, scheduler);
-                        break;
                     case TaskStatus.Faulted:
-                        res = new Throw<Unit>(task.Exception.InnerException, scheduler);
-                        break;
+                        return new Throw<Unit>(task.Exception.InnerException, scheduler);
                     case TaskStatus.Canceled:
-                        res = new Throw<Unit>(new TaskCanceledException(task), scheduler);
-                        break;
+                        return new Throw<Unit>(new TaskCanceledException(task), scheduler);
                 }
-            }
-            else
-            {
-                //
-                // Separate method to avoid closure in synchronous completion case.
-                //
-                res = ToObservableSlow(task, scheduler);
-            }
 
-            return res;
-        }
+                return new Return<Unit>(Unit.Default, scheduler);
+            }
 
-        private static IObservable<Unit> ToObservableSlow(Task task, IScheduler scheduler)
-        {
             var subject = new AsyncSubject<Unit>();
-
             var options = GetTaskContinuationOptions(scheduler);
 
-            task.ContinueWith(t => ToObservableDone(task, subject), options);
+            task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<Unit>)subjectObject), subject, options);
 
-            return ToObservableResult(subject, scheduler);
+            return subject.ToObservableResult(scheduler);
         }
 
-        private static void ToObservableDone(Task task, IObserver<Unit> subject)
+        private static void EmitTaskResult(this Task task, IObserver<Unit> subject)
         {
             switch (task.Status)
             {
@@ -116,6 +99,26 @@ namespace System.Reactive.Threading.Tasks
             }
         }
 
+        internal static IDisposable Subscribe(this Task task, IObserver<Unit> observer)
+        {
+            if (task.IsCompleted)
+            {
+                task.EmitTaskResult(observer);
+                return Disposable.Empty;
+            }
+
+            var cts = new CancellationDisposable();
+
+            task.ContinueWith(
+                (t, observerObject) => t.EmitTaskResult((IObserver<Unit>)observerObject), 
+                observer, 
+                cts.Token, 
+                TaskContinuationOptions.ExecuteSynchronously, 
+                TaskScheduler.Default);
+
+            return cts;
+        }
+
         /// <summary>
         /// Returns an observable sequence that propagates the result of the task.
         /// </summary>
@@ -160,48 +163,30 @@ namespace System.Reactive.Threading.Tasks
 
         private static IObservable<TResult> ToObservableImpl<TResult>(Task<TResult> task, IScheduler scheduler)
         {
-            var res = default(IObservable<TResult>);
-
             if (task.IsCompleted)
             {
                 scheduler = scheduler ?? ImmediateScheduler.Instance;
 
                 switch (task.Status)
                 {
-                    case TaskStatus.RanToCompletion:
-                        res = new Return<TResult>(task.Result, scheduler);
-                        break;
                     case TaskStatus.Faulted:
-                        res = new Throw<TResult>(task.Exception.InnerException, scheduler);
-                        break;
+                        return new Throw<TResult>(task.Exception.InnerException, scheduler);
                     case TaskStatus.Canceled:
-                        res = new Throw<TResult>(new TaskCanceledException(task), scheduler);
-                        break;
+                        return new Throw<TResult>(new TaskCanceledException(task), scheduler);
                 }
-            }
-            else
-            {
-                //
-                // Separate method to avoid closure in synchronous completion case.
-                //
-                res = ToObservableSlow(task, scheduler);
-            }
 
-            return res;
-        }
+                return new Return<TResult>(task.Result, scheduler);
+            }
 
-        private static IObservable<TResult> ToObservableSlow<TResult>(Task<TResult> task, IScheduler scheduler)
-        {
             var subject = new AsyncSubject<TResult>();
-
             var options = GetTaskContinuationOptions(scheduler);
 
-            task.ContinueWith(t => ToObservableDone(task, subject), options);
+            task.ContinueWith((t, subjectObject) => t.EmitTaskResult((AsyncSubject<TResult>)subjectObject), subject, options);
 
-            return ToObservableResult(subject, scheduler);
+            return subject.ToObservableResult(scheduler);
         }
 
-        private static void ToObservableDone<TResult>(Task<TResult> task, IObserver<TResult> subject)
+        private static void EmitTaskResult<TResult>(this Task<TResult> task, IObserver<TResult> subject)
         {
             switch (task.Status)
             {
@@ -240,7 +225,7 @@ namespace System.Reactive.Threading.Tasks
             return options;
         }
 
-        private static IObservable<TResult> ToObservableResult<TResult>(AsyncSubject<TResult> subject, IScheduler scheduler)
+        private static IObservable<TResult> ToObservableResult<TResult>(this AsyncSubject<TResult> subject, IScheduler scheduler)
         {
             if (scheduler != null)
             {
@@ -250,6 +235,26 @@ namespace System.Reactive.Threading.Tasks
             return subject.AsObservable();
         }
 
+        internal static IDisposable Subscribe<TResult>(this Task<TResult> task, IObserver<TResult> observer)
+        {
+            if (task.IsCompleted)
+            {
+                task.EmitTaskResult(observer);
+                return Disposable.Empty;
+            }
+
+            var cts = new CancellationDisposable();
+
+            task.ContinueWith(
+                (t, observerObject) => t.EmitTaskResult((IObserver<TResult>)observerObject), 
+                observer, 
+                cts.Token, 
+                TaskContinuationOptions.ExecuteSynchronously, 
+                TaskScheduler.Default);
+
+            return cts;
+        }
+
         /// <summary>
         /// Returns a task that will receive the last value or the exception produced by the observable sequence.
         /// </summary>