瀏覽代碼

Regression tests for subtle behavior in ForEachAsync.

Bart De Smet 10 年之前
父節點
當前提交
5abdc70c22
共有 1 個文件被更改,包括 106 次插入3 次删除
  1. 106 3
      Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableImperativeTest.cs

+ 106 - 3
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableImperativeTest.cs

@@ -410,8 +410,7 @@ namespace ReactiveTests.Tests
         }
 
         [TestMethod]
-        [Ignore]
-        public void ForEachAsync_DisposeThrows()
+        public void ForEachAsync_DisposeThrows1()
         {
             var cts = new CancellationTokenSource();
             var ex = new Exception();
@@ -419,7 +418,7 @@ namespace ReactiveTests.Tests
             var xs = Observable.Create<int>(observer =>
             {
                 return new CompositeDisposable(
-                    Observable.Range(0, 10, Scheduler.Default).Subscribe(observer),
+                    Observable.Range(0, 10, Scheduler.CurrentThread).Subscribe(observer),
                     Disposable.Create(() => { throw ex; })
                 );
             });
@@ -427,11 +426,62 @@ namespace ReactiveTests.Tests
             var lst = new List<int>();
             var t = xs.ForEachAsync(lst.Add, cts.Token);
 
+            //
+            // Unfortunately, this doesn't throw for CurrentThread scheduling. The
+            // subscription completes prior to assignment of the disposable, so we
+            // succeed calling the TrySetResult method for the OnCompleted handler
+            // prior to observing the exception of the Dispose operation, which is
+            // surfacing upon assignment to the SingleAssignmentDisposable. As a
+            // result, the exception evaporates.
+            //
+            // It'd be a breaking change at this point to rethrow the exception in
+            // that case, so we're merely asserting regressions here.
+            //
             try
             {
                 t.Wait();
+            }
+            catch
+            {
                 Assert.Fail();
             }
+        }
+
+        [TestMethod]
+        public void ForEachAsync_DisposeThrows2()
+        {
+            var cts = new CancellationTokenSource();
+            var ex = new Exception();
+
+            var xs = Observable.Create<int>(observer =>
+            {
+                return new CompositeDisposable(
+                    Observable.Range(0, 10, Scheduler.CurrentThread).Subscribe(observer),
+                    Disposable.Create(() => { throw ex; })
+                );
+            });
+
+            var lst = new List<int>();
+
+            var t = default(Task);
+
+            Scheduler.CurrentThread.Schedule(() =>
+            {
+                t = xs.ForEachAsync(lst.Add, cts.Token);
+            });
+
+            //
+            // If the trampoline of the CurrentThread has been installed higher
+            // up the stack, the assignment of the subscription's disposable to
+            // the SingleAssignmentDisposable can complete prior to the Dispose
+            // method being called from the OnCompleted handler. In this case,
+            // the OnCompleted handler's invocation of Dispose will cause the
+            // exception to occur, and it bubbles out through TrySetException.
+            //
+            try
+            {
+                t.Wait();
+            }
             catch (AggregateException err)
             {
                 Assert.AreEqual(1, err.InnerExceptions.Count);
@@ -439,6 +489,59 @@ namespace ReactiveTests.Tests
             }
         }
 
+        [TestMethod]
+        public void ForEachAsync_DisposeThrows()
+        {
+            //
+            // Unfortunately, this test is non-deterministic due to the race
+            // conditions described above in the tests using the CurrentThread
+            // scheduler. The exception can come out through the OnCompleted
+            // handler but can equally well get swallowed if the main thread
+            // hasn't reached the assignment of the disposable yet, causing
+            // the OnCompleted handler to win the race. The user can deal with
+            // this by hooking an exception handler to the scheduler, so we
+            // assert this behavior here.
+            //
+            // It'd be a breaking change at this point to change rethrowing
+            // behavior, so we're merely asserting regressions here.
+            //
+
+            var hasCaughtEscapingException = false;
+
+            var cts = new CancellationTokenSource();
+            var ex = new Exception();
+
+            var s = Scheduler.Default.Catch<Exception>(err =>
+            {
+                Volatile.Write(ref hasCaughtEscapingException, true);
+                return ex == err;
+            });
+
+            while (!Volatile.Read(ref hasCaughtEscapingException))
+            {
+                var xs = Observable.Create<int>(observer =>
+                {
+                    return new CompositeDisposable(
+                        Observable.Range(0, 10, s).Subscribe(observer),
+                        Disposable.Create(() => { throw ex; })
+                    );
+                });
+
+                var lst = new List<int>();
+                var t = xs.ForEachAsync(lst.Add, cts.Token);
+
+                try
+                {
+                    t.Wait();
+                }
+                catch (AggregateException err)
+                {
+                    Assert.AreEqual(1, err.InnerExceptions.Count);
+                    Assert.AreSame(ex, err.InnerExceptions[0]);
+                }
+            }
+        }
+
         [TestMethod]
         public void ForEachAsync_SubscribeThrows()
         {