Przeglądaj źródła

Merge pull request #6 from Blewzman/FixAsyncEnumerableSelectMany

Dispose inner IAsyncEnumerators in AsyncEnumerable.SelectMany.
headinthebox 11 lat temu
rodzic
commit
0f3ca5957e

+ 39 - 4
Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Single.cs

@@ -244,11 +244,28 @@ namespace System.Linq
 
 
             return Create(() =>
             return Create(() =>
             {
             {
+                // A lock seems inevitable. Disposal of the outer enumerator and completion of
+                // MoveNext of the inner enumerator can happen concurrently.
+                var syncRoot = new object();
                 var e = source.GetEnumerator();
                 var e = source.GetEnumerator();
                 var ie = default(IAsyncEnumerator<TResult>);
                 var ie = default(IAsyncEnumerator<TResult>);
 
 
+                var disposeIe = new Action(() =>
+                {
+                    IAsyncEnumerator<TResult> localIe;
+
+                    lock (syncRoot)
+                    {
+                        localIe = ie;
+                        ie = null;
+                    }
+
+                    if (localIe != null)
+                        localIe.Dispose();
+                });
+
                 var cts = new CancellationTokenDisposable();
                 var cts = new CancellationTokenDisposable();
-                var d = new CompositeDisposable(cts, e);
+                var d = new CompositeDisposable(cts, new Disposable(disposeIe), e);
 
 
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
@@ -265,7 +282,7 @@ namespace System.Linq
                             }
                             }
                             else
                             else
                             {
                             {
-                                ie = null;
+                                disposeIe();
                                 outer(tcs, ct);
                                 outer(tcs, ct);
                             }
                             }
                         });
                         });
@@ -321,12 +338,30 @@ namespace System.Linq
 
 
             return Create(() =>
             return Create(() =>
             {
             {
+                // A lock seems inevitable. Disposal of the outer enumerator and completion of
+                // MoveNext of the inner enumerator can happen concurrently.
+                var syncRoot = new object();
                 var e = source.GetEnumerator();
                 var e = source.GetEnumerator();
                 var ie = default(IAsyncEnumerator<TResult>);
                 var ie = default(IAsyncEnumerator<TResult>);
+
+                var disposeIe = new Action(() =>
+                {
+                    IAsyncEnumerator<TResult> localIe;
+
+                    lock (syncRoot)
+                    {
+                        localIe = ie;
+                        ie = null;
+                    }
+
+                    if (localIe != null)
+                        localIe.Dispose();
+                });
+
                 var index = 0;
                 var index = 0;
 
 
                 var cts = new CancellationTokenDisposable();
                 var cts = new CancellationTokenDisposable();
-                var d = new CompositeDisposable(cts, e);
+                var d = new CompositeDisposable(cts, new Disposable(disposeIe), e);
 
 
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
@@ -343,7 +378,7 @@ namespace System.Linq
                             }
                             }
                             else
                             else
                             {
                             {
-                                ie = null;
+                                disposeIe();
                                 outer(tcs, ct);
                                 outer(tcs, ct);
                             }
                             }
                         });
                         });