Răsfoiți Sursa

Fix for AsyncEnumerable.SelectMany dispose behavior + idempotency for Dispose operations.

Bart De Smet 10 ani în urmă
părinte
comite
5260b7a82a

+ 11 - 35
Ix.NET/Source/System.Interactive.Async/AsyncEnumerable.Single.cs

@@ -244,27 +244,13 @@ namespace System.Linq
 
             return Create(() =>
             {
-                // A lock seems inevitable. Disposal of the outer enumerator and completion of
-                // MoveNext of the inner enumerator can happen concurrently.
-                var syncRoot = new object();
                 var e = source.GetEnumerator();
                 var ie = default(IAsyncEnumerator<TResult>);
 
-                var disposeIe = new Action(() =>
-                {
-                    var localIe = default(IAsyncEnumerator<TResult>);
-
-                    lock (syncRoot)
-                    {
-                        localIe = ie;
-                    }
-
-                    if (localIe != null)
-                        localIe.Dispose();
-                });
+                var innerDisposable = new AssignableDisposable();
 
                 var cts = new CancellationTokenDisposable();
-                var d = Disposable.Create(cts, new Disposable(disposeIe), e);
+                var d = Disposable.Create(cts, innerDisposable, e);
 
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
@@ -281,7 +267,7 @@ namespace System.Linq
                             }
                             else
                             {
-                                disposeIe();
+                                innerDisposable.Disposable = null;
                                 outer(tcs, ct);
                             }
                         });
@@ -299,6 +285,8 @@ namespace System.Linq
                                 try
                                 {
                                     ie = selector(e.Current).GetEnumerator();
+                                    innerDisposable.Disposable = ie;
+
                                     inner(tcs, ct);
                                 }
                                 catch (Exception ex)
@@ -337,29 +325,15 @@ namespace System.Linq
 
             return Create(() =>
             {
-                // A lock seems inevitable. Disposal of the outer enumerator and completion of
-                // MoveNext of the inner enumerator can happen concurrently.
-                var syncRoot = new object();
                 var e = source.GetEnumerator();
                 var ie = default(IAsyncEnumerator<TResult>);
 
-                var disposeIe = new Action(() =>
-                {
-                    var localIe = default(IAsyncEnumerator<TResult>);
-
-                    lock (syncRoot)
-                    {
-                        localIe = ie;
-                    }
-
-                    if (localIe != null)
-                        localIe.Dispose();
-                });
-
                 var index = 0;
 
+                var innerDisposable = new AssignableDisposable();
+
                 var cts = new CancellationTokenDisposable();
-                var d = Disposable.Create(cts, new Disposable(disposeIe), e);
+                var d = Disposable.Create(cts, innerDisposable, e);
 
                 var outer = default(Action<TaskCompletionSource<bool>, CancellationToken>);
                 var inner = default(Action<TaskCompletionSource<bool>, CancellationToken>);
@@ -376,7 +350,7 @@ namespace System.Linq
                             }
                             else
                             {
-                                disposeIe();
+                                innerDisposable.Disposable = null;
                                 outer(tcs, ct);
                             }
                         });
@@ -394,6 +368,8 @@ namespace System.Linq
                                 try
                                 {
                                     ie = selector(e.Current, checked(index++)).GetEnumerator();
+                                    innerDisposable.Disposable = ie;
+
                                     inner(tcs, ct);
                                 }
                                 catch (Exception ex)

+ 47 - 17
Ix.NET/Source/System.Interactive.Async/Disposables.cs

@@ -1,6 +1,4 @@
 // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
-using System;
-using System.Collections.Generic;
 using System.Threading;
 
 namespace System.Linq
@@ -9,18 +7,27 @@ namespace System.Linq
     {
         private readonly CancellationTokenSource cts = new CancellationTokenSource();
 
-        public CancellationToken Token { get { return cts.Token; } }
+        public CancellationToken Token
+        {
+            get
+            {
+                return cts.Token;
+            }
+        }
 
         public void Dispose()
         {
             if (!cts.IsCancellationRequested)
+            {
                 cts.Cancel();
+            }
         }
     }
 
     class CompositeDisposable : IDisposable
     {
-        private readonly IDisposable[] _dispose;
+        private static IDisposable[] s_empty = new IDisposable[0];
+        private IDisposable[] _dispose;
 
         public CompositeDisposable(params IDisposable[] dispose)
         {
@@ -29,8 +36,12 @@ namespace System.Linq
 
         public void Dispose()
         {
-            foreach (var d in _dispose)
+            var dispose = Interlocked.Exchange(ref _dispose, s_empty);
+
+            foreach (var d in dispose)
+            {
                 d.Dispose();
+            }
         }
     }
 
@@ -46,13 +57,14 @@ namespace System.Linq
             {
                 lock (_gate)
                 {
-                    if (_disposable != null)
-                        _disposable.Dispose();
+                    DisposeInner();
 
                     _disposable = value;
 
                     if (_disposed)
-                        _disposable.Dispose();
+                    {
+                        DisposeInner();
+                    }
                 }
             }
         }
@@ -64,17 +76,25 @@ namespace System.Linq
                 if (!_disposed)
                 {
                     _disposed = true;
-
-                    if (_disposable != null)
-                        _disposable.Dispose();
+                    DisposeInner();
                 }
             }
         }
+
+        private void DisposeInner()
+        {
+            if (_disposable != null)
+            {
+                _disposable.Dispose();
+                _disposable = null;
+            }
+        }
     }
 
     class Disposable : IDisposable
     {
-        private readonly Action _dispose;
+        private static Action s_nop = () => { };
+        private Action _dispose;
 
         public Disposable(Action dispose)
         {
@@ -93,14 +113,15 @@ namespace System.Linq
 
         public void Dispose()
         {
-            _dispose();
+            var dispose = Interlocked.Exchange(ref _dispose, s_nop);
+            dispose();
         }
     }
 
     class BinaryDisposable : IDisposable
     {
-        private readonly IDisposable _d1;
-        private readonly IDisposable _d2;
+        private IDisposable _d1;
+        private IDisposable _d2;
 
         public BinaryDisposable(IDisposable d1, IDisposable d2)
         {
@@ -110,8 +131,17 @@ namespace System.Linq
 
         public void Dispose()
         {
-            _d1.Dispose();
-            _d2.Dispose();
+            var d1 = Interlocked.Exchange(ref _d1, null);
+            if (d1 != null)
+            {
+                d1.Dispose();
+
+                var d2 = Interlocked.Exchange(ref _d2, null);
+                if (d2 != null)
+                {
+                    d2.Dispose();
+                }
+            }
         }
     }
 }

+ 54 - 0
Ix.NET/Source/Tests/AsyncTests.Bugs.cs

@@ -219,6 +219,60 @@ namespace Tests
 
             Assert.AreEqual("Check", enumerable.First().Result);
         }
+
+        [TestMethod]
+        public void SelectManyDisposeInvokedOnlyOnce()
+        {
+            var disposeCounter = new DisposeCounter();
+
+            var result = AsyncEnumerable.Return(1).SelectMany(i => disposeCounter).Select(i => i).ToList().Result;
+
+            Assert.AreEqual(0, result.Count);
+            Assert.AreEqual(1, disposeCounter.DisposeCount);
+        }
+
+        [TestMethod]
+        public void SelectManyInnerDispose()
+        {
+            var disposes = Enumerable.Range(0, 10).Select(_ => new DisposeCounter()).ToList();
+
+            var result = AsyncEnumerable.Range(0, 10).SelectMany(i => disposes[i]).Select(i => i).ToList().Result;
+
+            Assert.AreEqual(0, result.Count);
+            Assert.IsTrue(disposes.All(d => d.DisposeCount == 1));
+        }
+
+        private class DisposeCounter : IAsyncEnumerable<object>
+        {
+            public int DisposeCount { get; private set; }
+
+            public IAsyncEnumerator<object> GetEnumerator()
+            {
+                return new Enumerator(this);
+            }
+
+            private class Enumerator : IAsyncEnumerator<object>
+            {
+                private readonly DisposeCounter _disposeCounter;
+
+                public Enumerator(DisposeCounter disposeCounter)
+                {
+                    _disposeCounter = disposeCounter;
+                }
+
+                public void Dispose()
+                {
+                    _disposeCounter.DisposeCount++;
+                }
+
+                public Task<bool> MoveNext(CancellationToken _)
+                {
+                    return Task.FromResult(false);
+                }
+
+                public object Current { get; }
+            }
+        }
     }
 
     static class MyExt