Browse Source

Merge pull request #979 from dotnet/cherry-pick-832-to-41

Cherry pick 832 to 41
Oren Novotny 6 years ago
parent
commit
27cbac6309

+ 29 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Finally.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Reactive.Disposables;
+using System.Threading;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -24,7 +25,6 @@ namespace System.Reactive.Linq.ObservableImpl
         internal sealed class _ : IdentitySink<TSource>
         {
             private readonly Action _finallyAction;
-
             private IDisposable _sourceDisposable;
 
             public _(Action finallyAction, IObserver<TSource> observer)
@@ -35,19 +35,43 @@ namespace System.Reactive.Linq.ObservableImpl
 
             public override void Run(IObservable<TSource> source)
             {
-                Disposable.SetSingle(ref _sourceDisposable, source.SubscribeSafe(this));
+                var d = source.SubscribeSafe(this);
+
+                if (Interlocked.CompareExchange(ref _sourceDisposable, d, null) == BooleanDisposable.True)
+                {
+                    // The Dispose(bool) methode was already called before the
+                    // subscription could be assign, hence the subscription
+                    // needs to be diposed here and the action needs to be invoked.
+                    try
+                    {
+                        d.Dispose();
+                    }
+                    finally
+                    {
+                        _finallyAction();
+                    }
+                }
             }
 
             protected override void Dispose(bool disposing)
             {
+                base.Dispose(disposing);
+
                 if (disposing)
                 {
-                    if (Disposable.TryDispose(ref _sourceDisposable))
+                    var d = Interlocked.Exchange(ref _sourceDisposable, BooleanDisposable.True);
+                    if (d != BooleanDisposable.True && d != null)
                     {
-                        _finallyAction();
+                        try
+                        {
+                            d.Dispose();
+                        }
+                        finally
+                        {
+                            _finallyAction();
+                        }
                     }
                 }
-                base.Dispose(disposing);
             }
         }
     }

+ 9 - 5
Rx.NET/Source/src/System.Reactive/Linq/Observable/Using.cs

@@ -34,33 +34,37 @@ namespace System.Reactive.Linq.ObservableImpl
             public void Run(Using<TSource, TResource> parent)
             {
                 var source = default(IObservable<TSource>);
+                var disposable = Disposable.Empty;
                 try
                 {
                     var resource = parent._resourceFactory();
                     if (resource != null)
                     {
-                        Disposable.SetSingle(ref _disposable, resource);
+                        disposable = resource;
                     }
 
                     source = parent._observableFactory(resource);
                 }
                 catch (Exception exception)
                 {
-                    SetUpstream(Observable.Throw<TSource>(exception).SubscribeSafe(this));
-
-                    return;
+                    source = Observable.Throw<TSource>(exception);
                 }
 
+                // It is important to set the disposable resource after
+                // Run(). In the synchronous case this would else dispose
+                // the the resource before the source subscription.
                 Run(source);
+                Disposable.SetSingle(ref _disposable, disposable);
             }
 
             protected override void Dispose(bool disposing)
             {
+                base.Dispose(disposing);
+
                 if (disposing)
                 {
                     Disposable.TryDispose(ref _disposable);
                 }
-                base.Dispose(disposing);
             }
         }
     }

+ 44 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/FinallyTest.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System;
+using System.Reactive;
 using System.Reactive.Linq;
 using Microsoft.Reactive.Testing;
 using Xunit;
@@ -142,5 +143,48 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [Fact]
+        public void Finally_DisposeOrder_Empty()
+        {
+            var order = "";
+            Observable
+                .Empty<Unit>()
+                .Finally(() => order += "1")
+                .Finally(() => order += "2")
+                .Finally(() => order += "3")
+                .Subscribe();
+
+            Assert.Equal("123", order);
+        }
+
+        [Fact]
+        public void Finally_DisposeOrder_Return()
+        {
+            var order = "";
+            Observable
+                .Return(Unit.Default)
+                .Finally(() => order += "1")
+                .Finally(() => order += "2")
+                .Finally(() => order += "3")
+                .Subscribe();
+
+            Assert.Equal("123", order);
+        }
+
+        [Fact]
+        public void Finally_DisposeOrder_Never()
+        {
+            var order = "";
+            var d = Observable
+                .Never<Unit>()
+                .Finally(() => order += "1")
+                .Finally(() => order += "2")
+                .Finally(() => order += "3")
+                .Subscribe();
+
+            d.Dispose();
+
+            Assert.Equal("123", order);
+        }
     }
 }

+ 32 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/UsingTest.cs

@@ -3,6 +3,8 @@
 // See the LICENSE file in the project root for more information. 
 
 using System;
+using System.Reactive;
+using System.Reactive.Disposables;
 using System.Reactive.Linq;
 using Microsoft.Reactive.Testing;
 using ReactiveTests.Dummies;
@@ -295,5 +297,35 @@ namespace ReactiveTests.Tests
             );
         }
 
+        [Fact]
+        public void Using_NestedCompleted()
+        {
+            var order = "";
+
+            Observable.Using(() => Disposable.Create(() => order += "3"),
+                _ => Observable.Using(() => Disposable.Create(() => order += "2"),
+                    __ => Observable.Using(() => Disposable.Create(() => order += "1"),
+                        ___ => Observable.Return(Unit.Default))))
+                .Finally(() => order += "4")
+                .Subscribe();
+
+            Assert.Equal("1234", order);
+        }
+
+        [Fact]
+        public void Using_NestedDisposed()
+        {
+            var order = "";
+
+            Observable.Using(() => Disposable.Create(() => order += "3"),
+                _ => Observable.Using(() => Disposable.Create(() => order += "2"),
+                    __ => Observable.Using(() => Disposable.Create(() => order += "1"),
+                        ___ => Observable.Never<Unit>())))
+                .Finally(() => order += "4")
+                .Subscribe()
+                .Dispose();
+
+            Assert.Equal("1234", order);
+        }
     }
 }

+ 1 - 1
Rx.NET/Source/version.json

@@ -1,5 +1,5 @@
 {
-  "version": "4.1.5",
+  "version": "4.1.6",
   "publicReleaseRefSpec": [
     "^refs/heads/master$", // we release out of master
     "^refs/heads/rel/v\\d+\\.\\d+" // we also release branches starting with rel/vN.N