瀏覽代碼

Allowing tail recursive Catch operator to catch subscription exceptions.

Bart De Smet 10 年之前
父節點
當前提交
ff932c0af4

+ 33 - 3
Rx.NET/Source/System.Reactive.Linq/Reactive/Internal/TailRecursiveSink.cs

@@ -74,6 +74,12 @@ namespace System.Reactive
                 {
                     e.Dispose();
 
+                    //
+                    // Failure to enumerate the sequence cannot be handled, even by
+                    // operators like Catch, because it'd lead to another attempt at
+                    // enumerating to find the next observable sequence. Therefore,
+                    // we feed those errors directly to the observer.
+                    //
                     base._observer.OnError(ex);
                     base.Dispose();
                     return;
@@ -98,9 +104,18 @@ namespace System.Reactive
                     }
                     catch (Exception exception)
                     {
-                        e.Dispose();
-                        base._observer.OnError(exception);
-                        base.Dispose();
+                        //
+                        // Errors from unpacking may produce side-effects that normally
+                        // would occur during a SubscribeSafe operation. Those would feed
+                        // back into the observer and be subject to the operator's error
+                        // handling behavior. For example, Catch would allow to handle
+                        // the error using a handler function.
+                        //
+                        if (!Fail(exception))
+                        {
+                            e.Dispose();
+                        }
+
                         return;
                     }
 
@@ -110,6 +125,7 @@ namespace System.Reactive
                     if (r == 0)
                     {
                         e.Dispose();
+
                         _stack.Pop();
                         _length.Pop();
                     }
@@ -165,6 +181,12 @@ namespace System.Reactive
             }
             catch (Exception exception)
             {
+                //
+                // Failure to enumerate the sequence cannot be handled, even by
+                // operators like Catch, because it'd lead to another attempt at
+                // enumerating to find the next observable sequence. Therefore,
+                // we feed those errors directly to the observer.
+                //
                 base._observer.OnError(exception);
                 base.Dispose();
 
@@ -182,6 +204,14 @@ namespace System.Reactive
             base._observer.OnCompleted();
             base.Dispose();
         }
+
+        protected virtual bool Fail(Exception error)
+        {
+            base._observer.OnError(error);
+            base.Dispose();
+
+            return false;
+        }
     }
 }
 #endif

+ 11 - 1
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Catch.cs

@@ -3,7 +3,6 @@
 #if !NO_PERF
 using System;
 using System.Collections.Generic;
-using System.Reactive.Concurrency;
 using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
@@ -68,6 +67,17 @@ namespace System.Reactive.Linq.ObservableImpl
 
                 base.Dispose();
             }
+
+            protected override bool Fail(Exception error)
+            {
+                //
+                // Note that the invocation of _recurse in OnError will
+                // cause the next MoveNext operation to be enqueued, so
+                // we will still return to the caller immediately.
+                //
+                OnError(error);
+                return true;
+            }
         }
     }
 

+ 11 - 2
Rx.NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/OnErrorResumeNext.cs

@@ -3,8 +3,6 @@
 #if !NO_PERF
 using System;
 using System.Collections.Generic;
-using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
 
 namespace System.Reactive.Linq.ObservableImpl
 {
@@ -54,6 +52,17 @@ namespace System.Reactive.Linq.ObservableImpl
             {
                 _recurse();
             }
+
+            protected override bool Fail(Exception error)
+            {
+                //
+                // Note that the invocation of _recurse in OnError will
+                // cause the next MoveNext operation to be enqueued, so
+                // we will still return to the caller immediately.
+                //
+                OnError(error);
+                return true;
+            }
         }
     }
 }

文件差異過大導致無法顯示
+ 185 - 121
Rx.NET/Source/Tests.System.Reactive/Tests/Linq/ObservableMultipleTest.cs


部分文件因文件數量過多而無法顯示