Browse Source

Do not dispose the enumerator while enumerating in the ToObservable operator.

Peter Wehrfritz 6 years ago
parent
commit
cdd5e2717a
1 changed files with 15 additions and 25 deletions
  1. 15 25
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

+ 15 - 25
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Threading.Tasks;
 
 namespace System.Linq
 {
@@ -28,51 +29,40 @@ namespace System.Linq
             public IDisposable Subscribe(IObserver<T> observer)
             {
                 var ctd = new CancellationTokenDisposable();
-                var e = _source.GetAsyncEnumerator(ctd.Token);
 
                 async void Core()
                 {
-                    bool hasNext;
                     try
                     {
-                        hasNext = await e.MoveNextAsync().ConfigureAwait(false);
-                    }
-                    catch (Exception ex)
-                    {
-                        if (!ctd.Token.IsCancellationRequested)
+                        await foreach (var element in _source.WithCancellation(ctd.Token).ConfigureAwait(false))
                         {
-                            observer.OnError(ex);
-                            await e.DisposeAsync().ConfigureAwait(false);
-                        }
+                            observer.OnNext(element);
 
-                        return;
+                            if (ctd.Token.IsCancellationRequested)
+                            {
+                                return;
+                            }
+                        }
                     }
-
-                    if (hasNext)
+                    catch (Exception error)
                     {
-                        observer.OnNext(e.Current);
-
                         if (!ctd.Token.IsCancellationRequested)
                         {
-                            Core();
+                            observer.OnError(error);
                         }
-
-                        // In case cancellation is requested, this could only have happened
-                        // by disposing the returned composite disposable (see below).
-                        // In that case, e will be disposed too, so there is no need to dispose e here.
+                        return;
                     }
-                    else
+
+                    if (!ctd.Token.IsCancellationRequested)
                     {
                         observer.OnCompleted();
-                        await e.DisposeAsync().ConfigureAwait(false);
                     }
                 }
 
+                // Fire and forget
                 Core();
 
-                // REVIEW: Safety of concurrent dispose operation; fire-and-forget nature of dispose?
-
-                return Disposable.Create(ctd, Disposable.Create(() => { e.DisposeAsync(); }));
+                return ctd;
             }
         }
     }