Prechádzať zdrojové kódy

Prepare Ix.Async for cancellation.

Bart De Smet 6 rokov pred
rodič
commit
a63eb273c9

+ 6 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs

@@ -25,9 +25,15 @@ namespace System.Linq
             if (second == null)
                 throw Error.ArgumentNull(nameof(second));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(first, second);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> first, IAsyncEnumerable<TSource> second, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 IAsyncEnumerator<TSource>? firstEnumerator = null;
                 IAsyncEnumerator<TSource>? secondEnumerator = null;

+ 12 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Buffer.cs

@@ -26,9 +26,15 @@ namespace System.Linq
             if (count <= 0)
                 throw Error.ArgumentOutOfRange(nameof(count));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, count);
+
+            static async IAsyncEnumerable<IList<TSource>> Core(IAsyncEnumerable<TSource> source, int count, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<IList<TSource>> Core(CancellationToken cancellationToken)
+#endif
             {
                 var buffer = new List<TSource>(count);
 
@@ -70,9 +76,15 @@ namespace System.Linq
             if (skip <= 0)
                 throw Error.ArgumentOutOfRange(nameof(skip));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, count, skip);
+
+            static async IAsyncEnumerable<IList<TSource>> Core(IAsyncEnumerable<TSource> source, int count, int skip, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<IList<TSource>> Core(CancellationToken cancellationToken)
+#endif
             {
                 var buffers = new Queue<IList<TSource>>();
 

+ 18 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Catch.cs

@@ -33,9 +33,15 @@ namespace System.Linq
             if (handler == null)
                 throw Error.ArgumentNull(nameof(handler));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, handler);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TException, IAsyncEnumerable<TSource>> handler, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
                 //         using the try statement either. A more trivial implementation would use await foreach
@@ -96,9 +102,15 @@ namespace System.Linq
             if (handler == null)
                 throw Error.ArgumentNull(nameof(handler));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, handler);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TException, ValueTask<IAsyncEnumerable<TSource>>> handler, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
                 //         using the try statement either. A more trivial implementation would use await foreach
@@ -160,9 +172,15 @@ namespace System.Linq
             if (handler == null)
                 throw Error.ArgumentNull(nameof(handler));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, handler);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 // REVIEW: This implementation mirrors the Ix implementation, which does not protect GetEnumerator
                 //         using the try statement either. A more trivial implementation would use await foreach

+ 18 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Concat.cs

@@ -22,9 +22,15 @@ namespace System.Linq
             if (sources == null)
                 throw Error.ArgumentNull(nameof(sources));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(sources);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<IAsyncEnumerable<TSource>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await foreach (var source in sources.WithCancellation(cancellationToken).ConfigureAwait(false))
                 {
@@ -48,9 +54,15 @@ namespace System.Linq
             if (sources == null)
                 throw Error.ArgumentNull(nameof(sources));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(sources);
+
+            static async IAsyncEnumerable<TSource> Core(IEnumerable<IAsyncEnumerable<TSource>> sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 foreach (var source in sources)
                 {
@@ -74,9 +86,15 @@ namespace System.Linq
             if (sources == null)
                 throw Error.ArgumentNull(nameof(sources));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(sources);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource>[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 foreach (var source in sources)
                 {

+ 18 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Defer.cs

@@ -22,9 +22,15 @@ namespace System.Linq
             if (factory == null)
                 throw Error.ArgumentNull(nameof(factory));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(factory);
+
+            static async IAsyncEnumerable<TSource> Core(Func<IAsyncEnumerable<TSource>> factory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await foreach (var item in factory().WithCancellation(cancellationToken).ConfigureAwait(false))
                 {
@@ -46,9 +52,15 @@ namespace System.Linq
             if (factory == null)
                 throw Error.ArgumentNull(nameof(factory));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(factory);
+
+            static async IAsyncEnumerable<TSource> Core(Func<Task<IAsyncEnumerable<TSource>>> factory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await foreach (var item in (await factory().ConfigureAwait(false)).WithCancellation(cancellationToken).ConfigureAwait(false))
                 {
@@ -73,9 +85,15 @@ namespace System.Linq
             if (factory == null)
                 throw Error.ArgumentNull(nameof(factory));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(factory);
+
+            static async IAsyncEnumerable<TSource> Core(Func<CancellationToken, Task<IAsyncEnumerable<TSource>>> factory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await foreach (var item in (await factory(cancellationToken).ConfigureAwait(false)).WithCancellation(cancellationToken).ConfigureAwait(false))
                 {

+ 24 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/DistinctUntilChanged.cs

@@ -166,9 +166,15 @@ namespace System.Linq
         {
             comparer ??= EqualityComparer<TSource>.Default;
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, comparer);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -201,9 +207,15 @@ namespace System.Linq
         {
             comparer ??= EqualityComparer<TKey>.Default;
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, keySelector, comparer);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -240,9 +252,15 @@ namespace System.Linq
         {
             comparer ??= EqualityComparer<TKey>.Default;
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, keySelector, comparer);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TSource> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -280,9 +298,15 @@ namespace System.Linq
         {
             comparer ??= EqualityComparer<TKey>.Default;
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, keySelector, comparer);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TSource> comparer, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 

+ 21 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs

@@ -299,9 +299,16 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception>? onError, Action? onCompleted)
         {
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, onNext, onError, onCompleted);
+
+            // TODO: Can remove local function.
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Action<TSource> onNext, Action<Exception>? onError, Action? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -339,9 +346,16 @@ namespace System.Linq
 
         private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task>? onError, Func<Task>? onCompleted)
         {
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, onNext, onError, onCompleted);
+
+            // TODO: Can remove local function.
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, Task> onNext, Func<Exception, Task>? onError, Func<Task>? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -383,9 +397,16 @@ namespace System.Linq
 #if !NO_DEEP_CANCELLATION
         private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task>? onError, Func<CancellationToken, Task>? onCompleted)
         {
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, onNext, onError, onCompleted);
+
+            // TODO: Can remove local function.
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task>? onError, Func<CancellationToken, Task>? onCompleted, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 

+ 18 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs

@@ -24,9 +24,15 @@ namespace System.Linq
             if (selector == null)
                 throw Error.ArgumentNull(nameof(selector));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, selector);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, IAsyncEnumerable<TSource>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 var queue = new Queue<IAsyncEnumerable<TSource>>();
 
@@ -58,9 +64,15 @@ namespace System.Linq
             if (selector == null)
                 throw Error.ArgumentNull(nameof(selector));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, selector);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<IAsyncEnumerable<TSource>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 var queue = new Queue<IAsyncEnumerable<TSource>>();
 
@@ -93,9 +105,15 @@ namespace System.Linq
             if (selector == null)
                 throw Error.ArgumentNull(nameof(selector));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, selector);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> selector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 var queue = new Queue<IAsyncEnumerable<TSource>>();
 

+ 12 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs

@@ -25,9 +25,15 @@ namespace System.Linq
             if (finallyAction == null)
                 throw Error.ArgumentNull(nameof(finallyAction));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, finallyAction);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Action finallyAction, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 try
                 {
@@ -58,9 +64,15 @@ namespace System.Linq
             if (finallyAction == null)
                 throw Error.ArgumentNull(nameof(finallyAction));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, finallyAction);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<Task> finallyAction, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 try
                 {

+ 7 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Generate.cs

@@ -31,10 +31,16 @@ namespace System.Linq
             if (resultSelector == null)
                 throw Error.ArgumentNull(nameof(resultSelector));
 
+#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(initialState, condition, iterate, resultSelector);
+
+            static async IAsyncEnumerable<TResult> Core(TState initialState, Func<TState, bool> condition, Func<TState, TState> iterate, Func<TState, TResult> resultSelector, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
-#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
             async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+#endif
             {
                 for (var state = initialState; condition(state); state = iterate(state))
                 {

+ 6 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/IgnoreElements.cs

@@ -22,9 +22,15 @@ namespace System.Linq
             if (source == null)
                 throw Error.ArgumentNull(nameof(source));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await foreach (var _ in source.WithCancellation(cancellationToken).ConfigureAwait(false))
                 {

+ 9 - 7
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs

@@ -22,6 +22,15 @@ namespace System.Linq
             if (sources == null)
                 throw Error.ArgumentNull(nameof(sources));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(sources);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource>[] sources, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
+            return AsyncEnumerable.Create(Core);
+
+            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
 #if USE_FAIR_AND_CHEAPER_MERGE
             //
             // This new implementation of Merge differs from the original one in a few ways:
@@ -34,10 +43,6 @@ namespace System.Linq
             //     instead of awaiting a new WhenAny task where "left" sources have preferential
             //     treatment over "right" sources.
             //
-
-            return AsyncEnumerable.Create(Core);
-
-            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
             {
                 var count = sources.Length;
 
@@ -177,9 +182,6 @@ namespace System.Linq
                 }
             }
 #else
-            return AsyncEnumerable.Create(Core);
-
-            async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
             {
                 var count = sources.Length;
 

+ 19 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Repeat.cs

@@ -18,10 +18,16 @@ namespace System.Linq
         /// <returns>The async-enumerable sequence producing the element repeatedly and sequentially.</returns>
         public static IAsyncEnumerable<TResult> Repeat<TResult>(TResult element)
         {
+#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(element);
+
+            static async IAsyncEnumerable<TResult> Core(TResult element, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
-#pragma warning disable CS1998 // Async method lacks 'await' operators and will run synchronously
             async IAsyncEnumerator<TResult> Core(CancellationToken cancellationToken)
+#endif
             {
                 while (true)
                 {
@@ -45,9 +51,15 @@ namespace System.Linq
             if (source == null)
                 throw Error.ArgumentNull(nameof(source));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 while (true)
                 {
@@ -75,9 +87,15 @@ namespace System.Linq
             if (count < 0)
                 throw Error.ArgumentOutOfRange(nameof(count));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, count);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, int count, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 for (var i = 0; i < count; i++)
                 {

+ 36 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Scan.cs

@@ -30,9 +30,15 @@ namespace System.Linq
             if (accumulator == null)
                 throw Error.ArgumentNull(nameof(accumulator));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, accumulator);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, TSource> accumulator, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -70,9 +76,15 @@ namespace System.Linq
             if (accumulator == null)
                 throw Error.ArgumentNull(nameof(accumulator));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, seed, accumulator);
+
+            static async IAsyncEnumerable<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, TAccumulate> accumulator, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TAccumulate> Core(CancellationToken cancellationToken)
+#endif
             {
                 var res = seed;
 
@@ -101,9 +113,15 @@ namespace System.Linq
             if (accumulator == null)
                 throw Error.ArgumentNull(nameof(accumulator));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, accumulator);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, ValueTask<TSource>> accumulator, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -140,9 +158,15 @@ namespace System.Linq
             if (accumulator == null)
                 throw Error.ArgumentNull(nameof(accumulator));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, accumulator);
+
+            static async IAsyncEnumerable<TSource> Core(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 await using var e = source.GetConfiguredAsyncEnumerator(cancellationToken, false);
 
@@ -181,9 +205,15 @@ namespace System.Linq
             if (accumulator == null)
                 throw Error.ArgumentNull(nameof(accumulator));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, seed, accumulator);
+
+            static async IAsyncEnumerable<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TAccumulate> Core(CancellationToken cancellationToken)
+#endif
             {
                 var res = seed;
 
@@ -215,9 +245,15 @@ namespace System.Linq
             if (accumulator == null)
                 throw Error.ArgumentNull(nameof(accumulator));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(source, seed, accumulator);
+
+            static async IAsyncEnumerable<TAccumulate> Core(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TAccumulate> Core(CancellationToken cancellationToken)
+#endif
             {
                 var res = seed;
 

+ 18 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Using.cs

@@ -28,9 +28,15 @@ namespace System.Linq
             if (enumerableFactory == null)
                 throw Error.ArgumentNull(nameof(enumerableFactory));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(resourceFactory, enumerableFactory);
+
+            static async IAsyncEnumerable<TSource> Core(Func<TResource> resourceFactory, Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 using var resource = resourceFactory();
 
@@ -59,9 +65,15 @@ namespace System.Linq
             if (enumerableFactory == null)
                 throw Error.ArgumentNull(nameof(enumerableFactory));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(resourceFactory, enumerableFactory);
+
+            static async IAsyncEnumerable<TSource> Core(Func<Task<TResource>> resourceFactory, Func<TResource, ValueTask<IAsyncEnumerable<TSource>>> enumerableFactory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 using var resource = await resourceFactory().ConfigureAwait(false);
 
@@ -92,9 +104,15 @@ namespace System.Linq
             if (enumerableFactory == null)
                 throw Error.ArgumentNull(nameof(enumerableFactory));
 
+#if HAS_ASYNC_ENUMERABLE_CANCELLATION
+            return Core(resourceFactory, enumerableFactory);
+
+            static async IAsyncEnumerable<TSource> Core(Func<CancellationToken, Task<TResource>> resourceFactory, Func<TResource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> enumerableFactory, [System.Runtime.CompilerServices.EnumeratorCancellation]CancellationToken cancellationToken = default)
+#else
             return AsyncEnumerable.Create(Core);
 
             async IAsyncEnumerator<TSource> Core(CancellationToken cancellationToken)
+#endif
             {
                 using var resource = await resourceFactory(cancellationToken).ConfigureAwait(false);