Ver Fonte

AsyncIx: More reviews/fixes/proposed changes

akarnokd há 7 anos atrás
pai
commit
b33b184a22

+ 40 - 1
Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Finally.cs

@@ -114,7 +114,46 @@ namespace Tests
             var xs = new[] { 1, 2 }.ToAsyncEnumerable().Finally(() => { i++; });
 
             await SequenceIdentity(xs);
-            Assert.Equal(2, i);
+            Assert.Equal(4, i);
+        }
+
+        [Fact]
+        public async Task Finally7_Async()
+        {
+            var i = 0;
+            var xs = new[] { 1, 2 }.ToAsyncEnumerable().Finally(async () => {
+                await Task.CompletedTask;
+                i++;
+            });
+
+            await SequenceIdentity(xs);
+            Assert.Equal(4, i);
+        }
+
+        [Fact]
+        public async Task Finally8()
+        {
+            var i = 0;
+            var en = AsyncEnumerable.Range(1, 5).Finally(() => i++).GetAsyncEnumerator();
+
+            await en.DisposeAsync();
+
+            Assert.Equal(1, i);
+        }
+
+        [Fact]
+        public async Task Finally8_Async()
+        {
+            var i = 0;
+            var en = AsyncEnumerable.Range(1, 5).Finally(async () =>
+            {
+                await Task.CompletedTask;
+                i++;
+            }).GetAsyncEnumerator();
+
+            await en.DisposeAsync();
+
+            Assert.Equal(1, i);
         }
     }
 }

+ 34 - 0
Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Never.cs

@@ -4,6 +4,7 @@
 
 using System;
 using System.Linq;
+using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 
@@ -25,5 +26,38 @@ namespace Tests
         private void Nop(object o)
         {
         }
+
+        [Fact]
+        public void CancelToken_Unblocks()
+        {
+            var cts = new CancellationTokenSource();
+
+            var en = AsyncEnumerableEx.Never<int>().GetAsyncEnumerator(cts.Token);
+
+            try
+            {
+                var t = Task.Run(async () =>
+                {
+                    await Task.Delay(100);
+                    cts.Cancel();
+                });
+
+                try
+                {
+                    Assert.True(en.MoveNextAsync().AsTask().Wait(2000));
+                }
+                catch (AggregateException ex)
+                {
+                    if (!(ex.InnerException is TaskCanceledException))
+                    {
+                        throw;
+                    }
+                }
+            }
+            finally
+            {
+                en.DisposeAsync().AsTask().Wait(2000);
+            }
+        }
     }
 }

+ 17 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs

@@ -38,6 +38,8 @@ namespace System.Linq
 
             private IAsyncEnumerator<TSource> _enumerator;
 
+            private bool _finallyOnce;
+
             public FinallyAsyncIterator(IAsyncEnumerable<TSource> source, Action finallyAction)
             {
                 Debug.Assert(source != null);
@@ -59,6 +61,13 @@ namespace System.Linq
                     await _enumerator.DisposeAsync().ConfigureAwait(false);
                     _enumerator = null;
 
+                }
+
+                // Run the _finallyAction even if
+                // the consumer did not call MoveNextAsync
+                if (!_finallyOnce)
+                {
+                    _finallyOnce = true;
                     _finallyAction();
                 }
 
@@ -96,6 +105,8 @@ namespace System.Linq
 
             private IAsyncEnumerator<TSource> _enumerator;
 
+            private bool _finallyOnce;
+
             public FinallyAsyncIteratorWithTask(IAsyncEnumerable<TSource> source, Func<Task> finallyAction)
             {
                 Debug.Assert(source != null);
@@ -116,7 +127,13 @@ namespace System.Linq
                 {
                     await _enumerator.DisposeAsync().ConfigureAwait(false);
                     _enumerator = null;
+                }
 
+                // Await the _finallyAction even if
+                // the consumer did not call MoveNextAsync
+                if (!_finallyOnce)
+                {
+                    _finallyOnce = true;
                     await _finallyAction().ConfigureAwait(false);
                 }
 

+ 39 - 8
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs

@@ -3,6 +3,8 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -11,14 +13,43 @@ namespace System.Linq
     {
         public static IAsyncEnumerable<TValue> Never<TValue>()
         {
-            //
-            // REVIEW: The C# 8.0 proposed interfaces don't allow for cancellation, so this "Never" is
-            //         as never as never can be; it can't be interrupted *at all*, similar to the sync
-            //         variant in Ix. Passing a *hot* CancellationToken to the Never operator doesn't
-            //         seem correct either, given that we return a *cold* sequence.
-            //
-
-            return AsyncEnumerable.CreateEnumerable(() => AsyncEnumerable.CreateEnumerator<TValue>(() => TaskExt.Never, current: null, dispose: null));
+            return NeverAsyncEnumerable<TValue>.Instance;
+        }
+
+        private sealed class NeverAsyncEnumerable<TValue> : IAsyncEnumerable<TValue>
+        {
+            internal static readonly NeverAsyncEnumerable<TValue> Instance = new NeverAsyncEnumerable<TValue>();
+
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken = default)
+            {
+                return new NeverAsyncEnumerator(cancellationToken);
+            }
+
+            private sealed class NeverAsyncEnumerator : IAsyncEnumerator<TValue>
+            {
+                public TValue Current => throw new InvalidOperationException();
+
+                private readonly CancellationToken _token;
+
+                public NeverAsyncEnumerator(CancellationToken token)
+                {
+                    _token = token;
+                }
+
+                public ValueTask DisposeAsync()
+                {
+                    return TaskExt.CompletedTask;
+                }
+
+                public ValueTask<bool> MoveNextAsync()
+                {
+                    return new ValueTask<bool>(Task.Run(async () =>
+                    {
+                        await Task.Delay(Threading.Timeout.Infinite, _token);
+                        return false;
+                    }, _token));
+                }
+            }
         }
     }
 }

+ 48 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Return.cs

@@ -3,6 +3,8 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Linq
 {
@@ -10,7 +12,52 @@ namespace System.Linq
     {
         public static IAsyncEnumerable<TValue> Return<TValue>(TValue value)
         {
-            return new[] { value }.ToAsyncEnumerable();
+            return new ReturnEnumerable<TValue>(value);
+        }
+
+        // FIXME: AsyncListPartition is internal to the project System.Linq.Async
+        // project, not sure how to expose it here
+        private sealed class ReturnEnumerable<TValue> : IAsyncEnumerable<TValue>
+        {
+            private readonly TValue _value;
+
+            public ReturnEnumerable(TValue value)
+            {
+                _value = value;
+            }
+
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken = default)
+            {
+                return new ReturnEnumerator(_value);
+            }
+
+            private sealed class ReturnEnumerator : IAsyncEnumerator<TValue>
+            {
+                public TValue Current { get; private set; }
+
+                private bool _once;
+
+                public ReturnEnumerator(TValue current)
+                {
+                    Current = current;
+                }
+
+                public ValueTask DisposeAsync()
+                {
+                    Current = default;
+                    return TaskExt.CompletedTask;
+                }
+
+                public ValueTask<bool> MoveNextAsync()
+                {
+                    if (_once)
+                    {
+                        return TaskExt.False;
+                    }
+                    _once = true;
+                    return TaskExt.True;
+                }
+            }
         }
     }
 }

+ 32 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Throw.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -29,5 +30,36 @@ namespace System.Linq
                     dispose: null)
             );
         }
+
+        private sealed class ThrowEnumerable<TValue> : IAsyncEnumerable<TValue>, IAsyncEnumerator<TValue>
+        {
+            private readonly ValueTask<bool> _moveNextThrows;
+
+            public TValue Current => default;
+
+            public ThrowEnumerable(ValueTask<bool> moveNextThrows)
+            {
+                _moveNextThrows = moveNextThrows;
+            }
+
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken = default)
+            {
+                return this;
+            }
+
+            public ValueTask DisposeAsync()
+            {
+                return TaskExt.CompletedTask;
+            }
+
+            public ValueTask<bool> MoveNextAsync()
+            {
+                // May we let this fail over and over?
+                // If so, the class doesn't need extra state
+                // and thus can be reused without creating an
+                // async enumerator
+                return _moveNextThrows;
+            }
+        }
     }
 }

+ 15 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs

@@ -30,6 +30,8 @@ namespace System.Linq
 
             private IAsyncEnumerator<TSource> _enumerator;
 
+            private Task _loserTask;
+
             public TimeoutAsyncIterator(IAsyncEnumerable<TSource> source, TimeSpan timeout)
             {
                 Debug.Assert(source != null);
@@ -45,7 +47,13 @@ namespace System.Linq
 
             public override async ValueTask DisposeAsync()
             {
-                if (_enumerator != null)
+                if (_loserTask != null)
+                {
+                    await _loserTask.ConfigureAwait(false);
+                    _loserTask = null;
+                    _enumerator = null;
+                }
+                else if (_enumerator != null)
                 {
                     await _enumerator.DisposeAsync().ConfigureAwait(false);
                     _enumerator = null;
@@ -73,10 +81,15 @@ namespace System.Linq
                             {
                                 var delay = Task.Delay(_timeout, delayCts.Token);
 
-                                var winner = await Task.WhenAny(moveNext.AsTask(), delay).ConfigureAwait(false);
+                                var next = moveNext.AsTask();
+
+                                var winner = await Task.WhenAny(next, delay).ConfigureAwait(false);
 
                                 if (winner == delay)
                                 {
+                                    // we still have to wait for the "next" to complete
+                                    // before we can dispose _enumerator
+                                    _loserTask = next.ContinueWith(async (t, state) => await ((IAsyncDisposable)state).DisposeAsync(), _enumerator);
                                     throw new TimeoutException();
                                 }