Преглед на файлове

Merge pull request #915 from quinmars/fix-contract-in-ToObservable

Ix: Do not dispose the enumerator while enumerating in the ToObservable operator.
Daniel C. Weber преди 6 години
родител
ревизия
46d757bf51

+ 73 - 1
Ix.NET/Source/System.Linq.Async.Tests/System/Linq/Operators/ToObservable.cs

@@ -108,7 +108,7 @@ namespace Tests
         }
 
         [Fact]
-        public void ToObservable4()
+        public void ToObservable_ThrowOnMoveNext()
         {
             using var evt = new ManualResetEvent(false);
 
@@ -139,6 +139,37 @@ namespace Tests
             Assert.Equal(ex1, ex_);
         }
 
+        [Fact]
+        public void ToObservable_ThrowOnCurrent()
+        {
+            var ex1 = new Exception("Bang!");
+            var ex_ = default(Exception);
+            var fail = false;
+
+            var ae = AsyncEnumerable.Create(
+                _ => new ThrowOnCurrentAsyncEnumerator(ex1)
+            );
+
+            ae.ToObservable()
+                .Subscribe(new MyObserver<int>(
+                x =>
+                {
+                    fail = true;
+                },
+                ex =>
+                {
+                    ex_ = ex;
+                },
+                () =>
+                {
+                    fail = true;
+                }
+            ));
+
+            Assert.False(fail);
+            Assert.Equal(ex1, ex_);
+        }
+
         [Fact]
         public void ToObservable_DisposesEnumeratorOnCompletion()
         {
@@ -269,6 +300,34 @@ namespace Tests
             Assert.Equal(1, moveNextCount);
             Assert.False(fail);
         }
+        
+        [Fact]
+        public void ToObservable_SupportsLargeEnumerable()
+        {
+            using var evt = new ManualResetEvent(false);
+
+            var fail = false;
+
+            var xs = AsyncEnumerable.Range(0, 10000).ToObservable();
+            xs.Subscribe(new MyObserver<int>(
+                x =>
+                {
+                    // ok
+                },
+                ex =>
+                {
+                    fail = true;
+                    evt.Set();
+                },
+                () =>
+                {
+                    evt.Set();
+                }
+            ));
+
+            evt.WaitOne();
+            Assert.False(fail);
+        }
 
         private sealed class MyObserver<T> : IObserver<T>
         {
@@ -289,5 +348,18 @@ namespace Tests
 
             public void OnNext(T value) => _onNext(value);
         }
+
+        private sealed class ThrowOnCurrentAsyncEnumerator : IAsyncEnumerator<int>
+        {
+            readonly private Exception _exception;
+            public ThrowOnCurrentAsyncEnumerator(Exception ex)
+            {
+                _exception = ex;
+            }
+
+            public int Current => throw _exception;
+            public ValueTask DisposeAsync() => default;
+            public ValueTask<bool> MoveNextAsync() => new ValueTask<bool>(true);
+        }
     }
 }

+ 30 - 31
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

@@ -28,51 +28,50 @@ namespace System.Linq
             public IDisposable Subscribe(IObserver<T> observer)
             {
                 var ctd = new CancellationTokenDisposable();
-                var e = _source.GetAsyncEnumerator(ctd.Token);
 
                 async void Core()
                 {
-                    bool hasNext;
-                    try
+                    await using (var e = _source.GetAsyncEnumerator(ctd.Token))
                     {
-                        hasNext = await e.MoveNextAsync().ConfigureAwait(false);
-                    }
-                    catch (Exception ex)
-                    {
-                        if (!ctd.Token.IsCancellationRequested)
+                        do
                         {
-                            observer.OnError(ex);
-                            await e.DisposeAsync().ConfigureAwait(false);
-                        }
+                            bool hasNext;
+                            var value = default(T)!;
 
-                        return;
-                    }
+                            try
+                            {
+                                hasNext = await e.MoveNextAsync().ConfigureAwait(false);
+                                if (hasNext)
+                                {
+                                    value = e.Current;
+                                }
+                            }
+                            catch (Exception ex)
+                            {
+                                if (!ctd.Token.IsCancellationRequested)
+                                {
+                                    observer.OnError(ex);
+                                }
 
-                    if (hasNext)
-                    {
-                        observer.OnNext(e.Current);
+                                return;
+                            }
 
-                        if (!ctd.Token.IsCancellationRequested)
-                        {
-                            Core();
-                        }
+                            if (!hasNext)
+                            {
+                                observer.OnCompleted();
+                                return;
+                            }
 
-                        // In case cancellation is requested, this could only have happened
-                        // by disposing the returned composite disposable (see below).
-                        // In that case, e will be disposed too, so there is no need to dispose e here.
-                    }
-                    else
-                    {
-                        observer.OnCompleted();
-                        await e.DisposeAsync().ConfigureAwait(false);
+                            observer.OnNext(value);
+                        }
+                        while (!ctd.Token.IsCancellationRequested);
                     }
                 }
 
+                // Fire and forget
                 Core();
 
-                // REVIEW: Safety of concurrent dispose operation; fire-and-forget nature of dispose?
-
-                return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); }));
+                return ctd;
             }
         }
     }