Преглед изворни кода

Ix: Allow Share to be disposed, Make Finally react to token cancellation (#627)

David Karnok пре 7 година
родитељ
комит
cdb3993879

+ 12 - 3
Ix.NET/Source/System.Interactive.Async.Tests/AsyncTests.Exceptions.cs

@@ -393,12 +393,12 @@ namespace Tests
             Assert.True(b);
         }
 
-        //[Fact]
+        [Fact]
         public void Finally6()
         {
             var b = false;
 
-            var xs = new[] { 1, 2 }.ToAsyncEnumerable().Finally(() => { b = true; });
+            var xs = new[] { 1, 2 }.ToAsyncEnumerable().Finally(() => { Volatile.Write(ref b, true); });
 
             var e = xs.GetEnumerator();
 
@@ -408,7 +408,16 @@ namespace Tests
             cts.Cancel();
             t.Wait(WaitTimeoutMs);
 
-            Assert.True(b);
+            for (int i = 0; i < WaitTimeoutMs / 100; i++)
+            {
+                if (Volatile.Read(ref b))
+                {
+                    return;
+                }
+                Thread.Sleep(100);
+            }
+
+            Assert.True(true, "Timeout while waiting for b to become true.");
         }
 
         [Fact]

+ 43 - 13
Ix.NET/Source/System.Interactive.Async/Finally.cs

@@ -28,6 +28,10 @@ namespace System.Linq
 
             private IAsyncEnumerator<TSource> enumerator;
 
+            CancellationTokenRegistration _tokenRegistration;
+
+            int _once;
+
             public FinallyAsyncIterator(IAsyncEnumerable<TSource> source, Action finallyAction)
             {
                 Debug.Assert(source != null);
@@ -44,15 +48,24 @@ namespace System.Linq
 
             public override void Dispose()
             {
-                if (enumerator != null)
+                // This could now be executed by either MoveNextCore
+                // or the trigger from a CancellationToken
+                // make sure this happens at most once.
+                if (Interlocked.CompareExchange(ref _once, 1, 0) == 0)
                 {
-                    enumerator.Dispose();
-                    enumerator = null;
+                    if (enumerator != null)
+                    {
+                        enumerator.Dispose();
+                        // make sure the clearing of the enumerator
+                        // becomes visible to MoveNextCore
+                        Volatile.Write(ref enumerator, null);
 
-                    finallyAction();
-                }
+                        finallyAction();
+                    }
 
-                base.Dispose();
+                    base.Dispose();
+                    _tokenRegistration.Dispose();
+                }
             }
 
             protected override async Task<bool> MoveNextCore(CancellationToken cancellationToken)
@@ -65,14 +78,31 @@ namespace System.Linq
                         goto case AsyncIteratorState.Iterating;
 
                     case AsyncIteratorState.Iterating:
-                        if (await enumerator.MoveNext(cancellationToken)
-                                            .ConfigureAwait(false))
+                        // clear any previous registration
+                        _tokenRegistration.Dispose();
+                        // and setup a new registration
+                        // we can't know if the token is the same as last time
+                        // note that the registration extends the lifetime of "this"
+                        // so the current AsyncIterator better not be just abandoned
+                        _tokenRegistration = cancellationToken.Register(
+                            state => ((FinallyAsyncIterator<TSource>)state).Dispose(), this);
+
+                        // Now that the CancellationToken may call Dispose
+                        // from any thread while the current thread is in
+                        // MoveNextCore, we must make sure the enumerator
+                        // hasn't been cleared out in the meantime
+                        var en = Volatile.Read(ref enumerator);
+                        if (en != null)
                         {
-                            current = enumerator.Current;
-                            return true;
-                        }
+                            if (await en.MoveNext(cancellationToken)
+                                                .ConfigureAwait(false))
+                            {
+                                current = enumerator.Current;
+                                return true;
+                            }
 
-                        Dispose();
+                            Dispose();
+                        }
                         break;
                 }
 
@@ -80,4 +110,4 @@ namespace System.Linq
             }
         }
     }
-}
+}

+ 1 - 1
Ix.NET/Source/System.Interactive.Tests/Tests.Buffering.cs

@@ -65,7 +65,7 @@ namespace Tests
             NoNext(e1);
         }
 
-        //[Fact]
+        [Fact]
         public void Share4()
         {
             var rng = Enumerable.Range(0, 5).Share();

+ 51 - 12
Ix.NET/Source/System.Interactive/Share.cs

@@ -102,27 +102,66 @@ namespace System.Linq
 
             private IEnumerator<T> GetEnumerator_()
             {
-                while (true)
+                return new ShareEnumerator(this);
+            }
+
+            sealed class ShareEnumerator : IEnumerator<T>
+            {
+                readonly SharedBuffer<T> _parent;
+
+                T _current;
+
+                bool _disposed;
+
+                public ShareEnumerator(SharedBuffer<T> parent)
+                {
+                    _parent = parent;
+                }
+
+                public T Current => _current;
+
+                object IEnumerator.Current => _current;
+
+                public void Dispose()
+                {
+                    _disposed = true;
+                }
+
+                public bool MoveNext()
                 {
                     if (_disposed)
+                    {
+                        return false;
+                    }
+                    if (_parent._disposed)
+                    {
                         throw new ObjectDisposedException("");
+                    }
 
-                    var hasValue = default(bool);
-                    var current = default(T);
-
-                    lock (_source)
+                    var hasValue = false;
+                    var src = _parent._source;
+                    lock (src)
                     {
-                        hasValue = _source.MoveNext();
+                        hasValue = src.MoveNext();
                         if (hasValue)
-                            current = _source.Current;
+                        {
+                            _current = src.Current;
+                        }
                     }
-
                     if (hasValue)
-                        yield return current;
-                    else
-                        break;
+                    {
+                        return true;
+                    }
+                    _disposed = true;
+                    _current = default(T);
+                    return false;
+                }
+
+                public void Reset()
+                {
+                    throw new NotSupportedException();
                 }
             }
         }
     }
-}
+}