Bart De Smet před 7 roky
rodič
revize
cf4bc0852c

+ 1 - 1
Ix.NET/Source/Benchmarks.System.Interactive/Benchmarks.System.Interactive.csproj

@@ -20,7 +20,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="BenchmarkDotNet" Version="0.10.14" />
+    <PackageReference Include="BenchmarkDotNet" Version="0.11.2" />
   </ItemGroup>
 
   <ItemGroup Condition="'$(Configuration)|$(Platform)'=='Ix.net 3.1.1|AnyCPU'">

+ 1 - 1
Ix.NET/Source/Benchmarks.System.Interactive/Program.cs

@@ -21,7 +21,7 @@ namespace Benchmarks.System.Interactive
                 typeof(IgnoreElementsBenchmark),
                 typeof(DeferBenchmark),
                 typeof(RetryBenchmark),
-                typeof(MinMaxBenchmark),
+                typeof(MinMaxBenchmark)
             });
 
             switcher.Run();

+ 36 - 1
Ix.NET/Source/System.Interactive.Async.Tests/System/Linq/Operators/Never.cs

@@ -1,8 +1,10 @@
-// Licensed to the .NET Foundation under one or more agreements.
+// Licensed to the .NET Foundation under one or more agreements.
 // The .NET Foundation licenses this file to you under the Apache 2.0 License.
 // See the LICENSE file in the project root for more information. 
 
+using System;
 using System.Linq;
+using System.Threading;
 using System.Threading.Tasks;
 using Xunit;
 
@@ -19,5 +21,38 @@ namespace Tests
             Assert.False(e.MoveNextAsync().IsCompleted); // Very rudimentary check
             await e.DisposeAsync();
         }
+
+        [Fact]
+        public void CancelToken_Unblocks()
+        {
+            var cts = new CancellationTokenSource();
+
+            var en = AsyncEnumerableEx.Never<int>().GetAsyncEnumerator(cts.Token);
+
+            try
+            {
+                var t = Task.Run(async () =>
+                {
+                    await Task.Delay(100);
+                    cts.Cancel();
+                });
+
+                try
+                {
+                    Assert.True(en.MoveNextAsync().AsTask().Wait(2000));
+                }
+                catch (AggregateException ex)
+                {
+                    if (!(ex.InnerException is TaskCanceledException))
+                    {
+                        throw;
+                    }
+                }
+            }
+            finally
+            {
+                en.DisposeAsync().AsTask().Wait(2000);
+            }
+        }
     }
 }

+ 50 - 8
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Never.cs

@@ -3,6 +3,8 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Runtime.CompilerServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -11,14 +13,54 @@ namespace System.Linq
     {
         public static IAsyncEnumerable<TValue> Never<TValue>()
         {
-            //
-            // REVIEW: The C# 8.0 proposed interfaces don't allow for cancellation, so this "Never" is
-            //         as never as never can be; it can't be interrupted *at all*, similar to the sync
-            //         variant in Ix. Passing a *hot* CancellationToken to the Never operator doesn't
-            //         seem correct either, given that we return a *cold* sequence.
-            //
-
-            return AsyncEnumerable.CreateEnumerable(_ => AsyncEnumerator.Create<TValue>(() => TaskExt.Never, current: null, dispose: null));
+            return NeverAsyncEnumerable<TValue>.Instance;
+        }
+
+        private sealed class NeverAsyncEnumerable<TValue> : IAsyncEnumerable<TValue>
+        {
+            internal static readonly NeverAsyncEnumerable<TValue> Instance = new NeverAsyncEnumerable<TValue>();
+
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken)
+            {
+                return new NeverAsyncEnumerator(cancellationToken);
+            }
+
+            private sealed class NeverAsyncEnumerator : IAsyncEnumerator<TValue>
+            {
+                public TValue Current => throw new InvalidOperationException();
+
+                private readonly CancellationToken _token;
+
+                private CancellationTokenRegistration _registration;
+
+                private bool _once;
+
+                private TaskCompletionSource<bool> _task;
+
+                public NeverAsyncEnumerator(CancellationToken token)
+                {
+                    _token = token;
+                }
+
+                public ValueTask DisposeAsync()
+                {
+                    _registration.Dispose();
+                    _task = null;
+                    return default;
+                }
+
+                public ValueTask<bool> MoveNextAsync()
+                {
+                    if (_once)
+                    {
+                        return new ValueTask<bool>(false);
+                    }
+                    _once = true;
+                    _task = new TaskCompletionSource<bool>();
+                    _registration = _token.Register(state => ((NeverAsyncEnumerator)state)._task.SetCanceled(), this);
+                    return new ValueTask<bool>(_task.Task);
+                }
+            }
         }
     }
 }

+ 48 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Return.cs

@@ -3,6 +3,8 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
 
 namespace System.Linq
 {
@@ -10,7 +12,52 @@ namespace System.Linq
     {
         public static IAsyncEnumerable<TValue> Return<TValue>(TValue value)
         {
-            return new[] { value }.ToAsyncEnumerable();
+            return new ReturnEnumerable<TValue>(value);
+        }
+
+        // FIXME: AsyncListPartition is internal to the project System.Linq.Async
+        // project, not sure how to expose it here
+        private sealed class ReturnEnumerable<TValue> : IAsyncEnumerable<TValue>
+        {
+            private readonly TValue _value;
+
+            public ReturnEnumerable(TValue value)
+            {
+                _value = value;
+            }
+
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken)
+            {
+                return new ReturnEnumerator(_value);
+            }
+
+            private sealed class ReturnEnumerator : IAsyncEnumerator<TValue>
+            {
+                public TValue Current { get; private set; }
+
+                private bool _once;
+
+                public ReturnEnumerator(TValue current)
+                {
+                    Current = current;
+                }
+
+                public ValueTask DisposeAsync()
+                {
+                    Current = default;
+                    return default;
+                }
+
+                public ValueTask<bool> MoveNextAsync()
+                {
+                    if (_once)
+                    {
+                        return new ValueTask<bool>(false);
+                    }
+                    _once = true;
+                    return new ValueTask<bool>(true);
+                }
+            }
         }
     }
 }

+ 42 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Throw.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -33,5 +34,46 @@ namespace System.Linq
                     dispose: null)
             );
         }
+
+        private sealed class ThrowEnumerable<TValue> : IAsyncEnumerable<TValue>
+        {
+            private readonly ValueTask<bool> _moveNextThrows;
+
+            public ThrowEnumerable(ValueTask<bool> moveNextThrows)
+            {
+                _moveNextThrows = moveNextThrows;
+            }
+
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken)
+            {
+                return new ThrowEnumerator(_moveNextThrows);
+            }
+
+            private sealed class ThrowEnumerator : IAsyncEnumerator<TValue>
+            {
+                private ValueTask<bool> _moveNextThrows;
+
+                public ThrowEnumerator(ValueTask<bool> moveNextThrows)
+                {
+                    _moveNextThrows = moveNextThrows;
+                }
+
+                public TValue Current => default;
+
+                public ValueTask DisposeAsync()
+                {
+                    _moveNextThrows = new ValueTask<bool>(false);
+                    return default;
+                }
+
+                public ValueTask<bool> MoveNextAsync()
+                {
+                    var result = _moveNextThrows;
+                    _moveNextThrows = new ValueTask<bool>(false);
+                    return result;
+                }
+
+            }
+        }
     }
 }

+ 15 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs

@@ -30,6 +30,8 @@ namespace System.Linq
 
             private IAsyncEnumerator<TSource> _enumerator;
 
+            private Task _loserTask;
+
             public TimeoutAsyncIterator(IAsyncEnumerable<TSource> source, TimeSpan timeout)
             {
                 Debug.Assert(source != null);
@@ -45,7 +47,13 @@ namespace System.Linq
 
             public override async ValueTask DisposeAsync()
             {
-                if (_enumerator != null)
+                if (_loserTask != null)
+                {
+                    await _loserTask.ConfigureAwait(false);
+                    _loserTask = null;
+                    _enumerator = null;
+                }
+                else if (_enumerator != null)
                 {
                     await _enumerator.DisposeAsync().ConfigureAwait(false);
                     _enumerator = null;
@@ -73,10 +81,15 @@ namespace System.Linq
                             {
                                 var delay = Task.Delay(_timeout, delayCts.Token);
 
-                                var winner = await Task.WhenAny(moveNext.AsTask(), delay).ConfigureAwait(false);
+                                var next = moveNext.AsTask();
+
+                                var winner = await Task.WhenAny(next, delay).ConfigureAwait(false);
 
                                 if (winner == delay)
                                 {
+                                    // we still have to wait for the "next" to complete
+                                    // before we can dispose _enumerator
+                                    _loserTask = next.ContinueWith(async (t, state) => await ((IAsyncDisposable)state).DisposeAsync(), _enumerator);
                                     throw new TimeoutException();
                                 }