ソースを参照

4.x: Fix accidental behavior change with Task-based Create methods completing when the body ends

(cherry picked from commit b2a449fd67473ea428d942757aefa8f5e2a15bea)
akarnokd 7 年 前
コミット
2baca5735f

+ 0 - 2
Rx.NET/Source/src/System.Reactive/Linq/QueryLanguage.Creation.cs

@@ -155,7 +155,6 @@ namespace System.Reactive.Linq
 
                     public void OnCompleted()
                     {
-                        _observer.OnCompleted();
                     }
 
                     public void OnError(Exception error)
@@ -233,7 +232,6 @@ namespace System.Reactive.Linq
 
                     public void OnCompleted()
                     {
-                        _observer.OnCompleted();
                     }
 
                     public void OnError(Exception error)

+ 41 - 0
Rx.NET/Source/tests/Tests.System.Reactive/Tests/Linq/Observable/CreateAsyncTest.cs

@@ -705,5 +705,46 @@ namespace ReactiveTests.Tests
             Assert.True(lst.Take(10).SequenceEqual(Enumerable.Repeat(42, 10)));
         }
 
+
+        [Fact]
+        public void CreateWithTaskDisposable_NoPrematureTermination()
+        {
+            var obs = Observable.Create<int>(async o =>
+            {
+                // avoid warning on async o due to no await
+                await Task.CompletedTask;
+
+                var inner = Observable.Range(1, 3);
+
+                return inner.Subscribe(x =>
+                {
+                    o.OnNext(x);
+                });
+            });
+
+            var result = obs.Take(1).Wait();
+        }
+
+        [Fact]
+        public void CreateWithTaskAction_NoPrematureTermination()
+        {
+            var obs = Observable.Create<int>(async o =>
+            {
+                // avoid warning on async o due to no await
+                await Task.CompletedTask;
+
+                var inner = Observable.Range(1, 3);
+
+                var d = inner.Subscribe(x =>
+                {
+                    o.OnNext(x);
+                });
+
+                Action a = () => d.Dispose();
+                return a;
+            });
+
+            var result = obs.Take(1).Wait();
+        }
     }
 }