Browse Source

A failing observer shall never be given its own exception.

Peter Wehrfritz 6 years ago
parent
commit
de239616ff
1 changed files with 22 additions and 18 deletions
  1. 22 18
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

+ 22 - 18
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

@@ -3,7 +3,6 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
-using System.Threading.Tasks;
 
 namespace System.Linq
 {
@@ -32,30 +31,35 @@ namespace System.Linq
 
                 async void Core()
                 {
-                    try
+                    // REVIEW: fire-and-forget DisposeAsync?
+                    await using (var e = _source.GetAsyncEnumerator(ctd.Token))
                     {
-                        await foreach (var element in _source.WithCancellation(ctd.Token).ConfigureAwait(false))
+                        do
                         {
-                            observer.OnNext(element);
+                            bool hasNext;
+                            try
+                            {
+                                hasNext = await e.MoveNextAsync().ConfigureAwait(false);
+                            }
+                            catch (Exception ex)
+                            {
+                                if (!ctd.Token.IsCancellationRequested)
+                                {
+                                    observer.OnError(ex);
+                                }
 
-                            if (ctd.Token.IsCancellationRequested)
+                                return;
+                            }
+
+                            if (!hasNext)
                             {
+                                observer.OnCompleted();
                                 return;
                             }
-                        }
-                    }
-                    catch (Exception error)
-                    {
-                        if (!ctd.Token.IsCancellationRequested)
-                        {
-                            observer.OnError(error);
-                        }
-                        return;
-                    }
 
-                    if (!ctd.Token.IsCancellationRequested)
-                    {
-                        observer.OnCompleted();
+                            observer.OnNext(e.Current);
+                        }
+                        while (!ctd.Token.IsCancellationRequested);
                     }
                 }