浏览代码

Dispose inner IAsyncEnumerators in AsyncEnumerable.SelectMany.

Daniel Weber 11 年之前
父节点
当前提交
d865f9d714
共有 1 个文件被更改,包括 35 次插入4 次删除
  1. 35 4
      Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Single.cs

+ 35 - 4
Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Single.cs

@@ -244,11 +244,26 @@ namespace System.Linq
 
             return Create(() =>
             {
+                // A lock seems inevitable. Disposal of the outer enumerator and completion of
+                // MoveNext of the inner enumerator can happen concurrently.
+                var syncRoot = new object();
                 var e = source.GetEnumerator();
                 var ie = default(IAsyncEnumerator<TResult>);
 
+                var disposeIe = new Action(() =>
+                {
+                    lock (syncRoot)
+                    {
+                        if (ie != null)
+                        {
+                            ie.Dispose();
+                            ie = null;
+                        }
+                    }
+                });
+
                 var cts = new CancellationTokenDisposable();
-                var d = new CompositeDisposable(cts, e);
+                var d = new CompositeDisposable(cts, new Disposable(disposeIe), e);
 
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
@@ -265,7 +280,7 @@ namespace System.Linq
                             }
                             else
                             {
-                                ie = null;
+                                disposeIe();
                                 outer(tcs, ct);
                             }
                         });
@@ -321,12 +336,28 @@ namespace System.Linq
 
             return Create(() =>
             {
+                // A lock seems inevitable. Disposal of the outer enumerator and completion of
+                // MoveNext of the inner enumerator can happen concurrently.
+                var syncRoot = new object();
                 var e = source.GetEnumerator();
                 var ie = default(IAsyncEnumerator<TResult>);
+
+                var disposeIe = new Action(() =>
+                {
+                    lock (syncRoot)
+                    {
+                        if (ie != null)
+                        {
+                            ie.Dispose();
+                            ie = null;
+                        }
+                    }
+                });
+
                 var index = 0;
 
                 var cts = new CancellationTokenDisposable();
-                var d = new CompositeDisposable(cts, e);
+                var d = new CompositeDisposable(cts, new Disposable(disposeIe), e);
 
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
@@ -343,7 +374,7 @@ namespace System.Linq
                             }
                             else
                             {
-                                ie = null;
+                                disposeIe();
                                 outer(tcs, ct);
                             }
                         });