ソースを参照

Simplify ToObservable implementation.

Bart De Smet 7 年 前
コミット
6716857cb4

+ 1 - 1
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs

@@ -132,7 +132,7 @@ namespace Tests
 
             evt.WaitOne();
             Assert.False(fail);
-            Assert.Equal(ex1, ((AggregateException)ex_).InnerExceptions.Single());
+            Assert.Equal(ex1, ex_);
         }
 
         [Fact]

+ 33 - 28
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

@@ -30,44 +30,49 @@ namespace System.Linq
                 var ctd = new CancellationTokenDisposable();
                 var e = _source.GetAsyncEnumerator(ctd.Token);
 
-                void Core() => e.MoveNextAsync().AsTask().ContinueWith(
-                    async t =>
+                async void Core()
+                {
+                    bool hasNext;
+                    try
                     {
-                        if (t.IsFaulted)
-                        {
-                            observer.OnError(t.Exception);
-                            await e.DisposeAsync().ConfigureAwait(false);
-                        }
-                        else if (t.IsCanceled)
+                        hasNext = await e.MoveNextAsync().ConfigureAwait(false);
+                    }
+                    catch (Exception ex)
+                    {
+                        if (!ctd.Token.IsCancellationRequested)
                         {
+                            observer.OnError(ex);
                             await e.DisposeAsync().ConfigureAwait(false);
                         }
-                        else if (t.IsCompleted)
-                        {
-                            if (t.Result)
-                            {
-                                observer.OnNext(e.Current);
 
-                                if (!ctd.Token.IsCancellationRequested)
-                                {
-                                    Core();
-                                }
+                        return;
+                    }
 
-                                // In case cancellation is requested, this could only have happened
-                                // by disposing the returned composite disposable (see below).
-                                // In that case, e will be disposed too, so there is no need to dispose e here.
-                            }
-                            else
-                            {
-                                observer.OnCompleted();
-                                await e.DisposeAsync().ConfigureAwait(false);
-                            }
+                    if (hasNext)
+                    {
+                        observer.OnNext(e.Current);
+
+                        if (!ctd.Token.IsCancellationRequested)
+                        {
+                            Core();
                         }
-                    }, ctd.Token);
+
+                        // In case cancellation is requested, this could only have happened
+                        // by disposing the returned composite disposable (see below).
+                        // In that case, e will be disposed too, so there is no need to dispose e here.
+                    }
+                    else
+                    {
+                        observer.OnCompleted();
+                        await e.DisposeAsync().ConfigureAwait(false);
+                    }
+                }
 
                 Core();
 
-                return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); /* REVIEW: fire-and-forget? */ }));
+                // REVIEW: Safety of concurrent dispose operation; fire-and-forget nature of dispose?
+
+                return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); }));
             }
         }
     }