Selaa lähdekoodia

Adapting to GetAsyncEnumerator(CancellationToken) interface change.

Bart De Smet 7 vuotta sitten
vanhempi
sitoutus
271688e421
77 muutettua tiedostoa jossa 437 lisäystä ja 370 poistoa
  1. 1 1
      Ix.NET/Source/System.Interactive.Async.Tests/AsyncTests.Bugs.cs
  2. 7 5
      Ix.NET/Source/System.Interactive.Async/AsyncIterator.cs
  3. 6 5
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs
  4. 3 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Buffer.cs
  5. 9 8
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Catch.cs
  6. 6 5
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Concat.cs
  7. 2 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Defer.cs
  8. 8 8
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Distinct.cs
  9. 7 6
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/DistinctUntilChanged.cs
  10. 5 4
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs
  11. 5 4
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs
  12. 5 4
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs
  13. 2 1
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Generate.cs
  14. 3 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/IgnoreElements.cs
  15. 1 1
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Max.cs
  16. 3 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs
  17. 1 1
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Min.cs
  18. 2 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/MinBy.cs
  19. 3 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/OnErrorResumeNext.cs
  20. 6 3
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Repeat.cs
  21. 9 8
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Scan.cs
  22. 2 2
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs
  23. 9 6
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Using.cs
  24. 3 2
      Ix.NET/Source/System.Linq.Async.Queryable/System/Linq/AsyncEnumerableQuery.cs
  25. 4 1
      Ix.NET/Source/System.Linq.Async/System/Collections/Generic/IAsyncEnumerable.cs
  26. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerableHelpers.cs
  27. 16 16
      Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerablePartition.cs
  28. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerator.cs
  29. 7 5
      Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs
  30. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/AsyncListPartition.cs
  31. 8 8
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Aggregate.cs
  32. 4 4
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/All.cs
  33. 6 6
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Any.cs
  34. 14 14
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/AppendPrepend.cs
  35. 30 30
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Average.cs
  36. 5 5
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Concat.cs
  37. 28 10
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Create.cs
  38. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/DefaultIfEmpty.cs
  39. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Distinct.cs
  40. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ElementAt.cs
  41. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ElementAtOrDefault.cs
  42. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Empty.cs
  43. 3 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Except.cs
  44. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/First.cs
  45. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/FirstOrDefault.cs
  46. 4 4
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ForEach.cs
  47. 7 7
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/GroupBy.cs
  48. 19 10
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/GroupJoin.cs
  49. 2 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Grouping.cs
  50. 3 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Intersect.cs
  51. 7 6
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Join.cs
  52. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Last.cs
  53. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/LastOrDefault.cs
  54. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Max.cs
  55. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Min.cs
  56. 14 13
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/OrderedAsyncEnumerable.cs
  57. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Range.cs
  58. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Repeat.cs
  59. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Reverse.cs
  60. 10 10
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Select.cs
  61. 25 24
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SelectMany.cs
  62. 2 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SequenceEqual.cs
  63. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Single.cs
  64. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SingleOrDefault.cs
  65. 3 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SkipLast.cs
  66. 9 8
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SkipWhile.cs
  67. 3 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/TakeLast.cs
  68. 9 8
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/TakeWhile.cs
  69. 3 3
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.cs
  70. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToEnumerable.cs
  71. 16 16
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToLookup.cs
  72. 1 1
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs
  73. 4 4
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Union.cs
  74. 11 10
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Where.cs
  75. 7 6
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Zip.cs
  76. 3 2
      Ix.NET/Source/System.Linq.Async/System/Linq/Set.cs
  77. 2 2
      Ix.NET/Source/System.Linq.Async/System/Threading/Tasks/AsyncEnumerableExtensions.cs

+ 1 - 1
Ix.NET/Source/System.Interactive.Async.Tests/AsyncTests.Bugs.cs

@@ -124,7 +124,7 @@ namespace Tests
         {
             public int DisposeCount { get; private set; }
 
-            public IAsyncEnumerator<object> GetAsyncEnumerator()
+            public IAsyncEnumerator<object> GetAsyncEnumerator(CancellationToken cancellationToken)
             {
                 return new Enumerator(this);
             }

+ 7 - 5
Ix.NET/Source/System.Interactive.Async/AsyncIterator.cs

@@ -16,23 +16,25 @@ namespace System.Linq
 
         internal TSource current;
         internal AsyncIteratorState state = AsyncIteratorState.New;
+        internal CancellationToken token;
 
         protected AsyncIterator()
         {
             threadId = Environment.CurrentManagedThreadId;
         }
 
-        public IAsyncEnumerator<TSource> GetAsyncEnumerator()
+        public IAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken token)
         {
             var enumerator = state == AsyncIteratorState.New && threadId == Environment.CurrentManagedThreadId
                 ? this
                 : Clone();
 
             enumerator.state = AsyncIteratorState.Allocated;
+            enumerator.token = token;
 
             try
             {
-                enumerator.OnGetEnumerator();
+                enumerator.OnGetEnumerator(token);
             }
             catch
             {
@@ -75,7 +77,7 @@ namespace System.Linq
 
             try
             {
-                var result = await MoveNextCore().ConfigureAwait(false);
+                var result = await MoveNextCore(token).ConfigureAwait(false);
 
                 currentIsInvalid = !result; // if move next is false, invalid otherwise valid
 
@@ -91,9 +93,9 @@ namespace System.Linq
 
         public abstract AsyncIterator<TSource> Clone();
 
-        protected abstract ValueTask<bool> MoveNextCore();
+        protected abstract ValueTask<bool> MoveNextCore(CancellationToken cancellationToken);
 
-        protected virtual void OnGetEnumerator()
+        protected virtual void OnGetEnumerator(CancellationToken cancellationToken)
         {
         }
     }

+ 6 - 5
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Amb.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -68,13 +69,13 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        var firstEnumerator = first.GetAsyncEnumerator();
-                        var secondEnumerator = second.GetAsyncEnumerator();
+                        var firstEnumerator = first.GetAsyncEnumerator(cancellationToken);
+                        var secondEnumerator = second.GetAsyncEnumerator(cancellationToken);
 
                         var firstMoveNext = firstEnumerator.MoveNextAsync().AsTask();
                         var secondMoveNext = secondEnumerator.MoveNextAsync().AsTask();
@@ -163,7 +164,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -175,7 +176,7 @@ namespace System.Linq
 
                         for (var i = 0; i < n; i++)
                         {
-                            var enumerator = sources[i].GetAsyncEnumerator();
+                            var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
 
                             enumerators[i] = enumerator;
                             moveNexts[i] = enumerator.MoveNextAsync();

+ 3 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Buffer.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -83,12 +84,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         buffers = new Queue<IList<TSource>>();
                         index = 0;
                         stopped = false;

+ 9 - 8
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Catch.cs

@@ -5,6 +5,7 @@
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Runtime.ExceptionServices;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -97,12 +98,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         isDone = false;
 
                         state = AsyncIteratorState.Iterating;
@@ -127,7 +128,7 @@ namespace System.Linq
                                     // invoking the handler, but we use this order to preserve
                                     // current behavior
                                     var inner = handler(ex);
-                                    var err = inner.GetAsyncEnumerator();
+                                    var err = inner.GetAsyncEnumerator(cancellationToken);
 
                                     if (enumerator != null)
                                     {
@@ -190,12 +191,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         isDone = false;
 
                         state = AsyncIteratorState.Iterating;
@@ -220,7 +221,7 @@ namespace System.Linq
                                     // invoking the handler, but we use this order to preserve
                                     // current behavior
                                     var inner = await handler(ex).ConfigureAwait(false);
-                                    var err = inner.GetAsyncEnumerator();
+                                    var err = inner.GetAsyncEnumerator(cancellationToken);
 
                                     if (enumerator != null)
                                     {
@@ -290,7 +291,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -313,7 +314,7 @@ namespace System.Linq
                                 }
 
                                 error = null;
-                                enumerator = sourcesEnumerator.Current.GetAsyncEnumerator();
+                                enumerator = sourcesEnumerator.Current.GetAsyncEnumerator(cancellationToken);
                             }
 
                             try

+ 6 - 5
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Concat.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -80,7 +81,7 @@ namespace System.Linq
             private const int State_OuterNext = 1;
             private const int State_While = 4;
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
 
                 switch (state)
@@ -103,7 +104,7 @@ namespace System.Linq
                                         await currentEnumerator.DisposeAsync().ConfigureAwait(false);
                                     }
 
-                                    currentEnumerator = outerEnumerator.Current.GetAsyncEnumerator();
+                                    currentEnumerator = outerEnumerator.Current.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_While;
                                     goto case State_While;
@@ -170,13 +171,13 @@ namespace System.Linq
             private const int State_OuterNext = 1;
             private const int State_While = 4;
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
 
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        outerEnumerator = source.GetAsyncEnumerator();
+                        outerEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         mode = State_OuterNext;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -193,7 +194,7 @@ namespace System.Linq
                                         await currentEnumerator.DisposeAsync().ConfigureAwait(false);
                                     }
 
-                                    currentEnumerator = outerEnumerator.Current.GetAsyncEnumerator();
+                                    currentEnumerator = outerEnumerator.Current.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_While;
                                     goto case State_While;

+ 2 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Defer.cs

@@ -15,7 +15,7 @@ namespace System.Linq
             if (factory == null)
                 throw new ArgumentNullException(nameof(factory));
 
-            return CreateEnumerable(() => factory().GetAsyncEnumerator());
+            return CreateEnumerable(ct => factory().GetAsyncEnumerator(ct));
         }
 
         public static IAsyncEnumerable<TSource> Defer<TSource>(Func<Task<IAsyncEnumerable<TSource>>> factory)
@@ -23,7 +23,7 @@ namespace System.Linq
             if (factory == null)
                 throw new ArgumentNullException(nameof(factory));
 
-            return CreateEnumerable(async () => (await factory().ConfigureAwait(false)).GetAsyncEnumerator());
+            return CreateEnumerable(async ct => (await factory().ConfigureAwait(false)).GetAsyncEnumerator(ct));
         }
     }
 }

+ 8 - 8
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Distinct.cs

@@ -107,7 +107,7 @@ namespace System.Linq
                 var count = 0;
                 var s = new Set<TKey>(comparer);
 
-                var enu = source.GetAsyncEnumerator();
+                var enu = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
@@ -145,12 +145,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                         {
@@ -189,7 +189,7 @@ namespace System.Linq
                 var s = new Set<TKey>(comparer);
                 var r = new List<TSource>();
 
-                var enu = source.GetAsyncEnumerator();
+                var enu = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
@@ -253,7 +253,7 @@ namespace System.Linq
                 var count = 0;
                 var s = new Set<TKey>(comparer);
 
-                var enu = source.GetAsyncEnumerator();
+                var enu = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
@@ -291,12 +291,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                         {
@@ -335,7 +335,7 @@ namespace System.Linq
                 var s = new Set<TKey>(comparer);
                 var r = new List<TSource>();
 
-                var enu = source.GetAsyncEnumerator();
+                var enu = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {

+ 7 - 6
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/DistinctUntilChanged.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -122,12 +123,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -193,12 +194,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -264,12 +265,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 

+ 5 - 4
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -162,12 +163,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -238,12 +239,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 

+ 5 - 4
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -66,7 +67,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -91,7 +92,7 @@ namespace System.Linq
                                         await enumerator.DisposeAsync().ConfigureAwait(false);
                                     }
 
-                                    enumerator = src.GetAsyncEnumerator();
+                                    enumerator = src.GetAsyncEnumerator(cancellationToken);
 
                                     continue; // loop
                                 }
@@ -156,7 +157,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -181,7 +182,7 @@ namespace System.Linq
                                         await enumerator.DisposeAsync().ConfigureAwait(false);
                                     }
 
-                                    enumerator = src.GetAsyncEnumerator();
+                                    enumerator = src.GetAsyncEnumerator(cancellationToken);
 
                                     continue; // loop
                                 }

+ 5 - 4
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -64,12 +65,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -122,12 +123,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 

+ 2 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Generate.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -65,7 +66,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 3 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/IgnoreElements.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -48,12 +49,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 

+ 1 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Max.cs

@@ -32,7 +32,7 @@ namespace System.Linq
 
         private static async Task<TSource> MaxCore<TSource>(IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {

+ 3 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Merge.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -75,7 +76,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -88,7 +89,7 @@ namespace System.Linq
 
                         for (var i = 0; i < n; i++)
                         {
-                            var enumerator = sources[i].GetAsyncEnumerator();
+                            var enumerator = sources[i].GetAsyncEnumerator(cancellationToken);
                             enumerators[i] = enumerator;
                             moveNexts[i] = enumerator.MoveNextAsync();
                         }

+ 1 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Min.cs

@@ -32,7 +32,7 @@ namespace System.Linq
 
         private static async Task<TSource> MinCore<TSource>(IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {

+ 2 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/MinBy.cs

@@ -112,7 +112,7 @@ namespace System.Linq
         {
             var result = new List<TSource>();
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
@@ -152,7 +152,7 @@ namespace System.Linq
         {
             var result = new List<TSource>();
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {

+ 3 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/OnErrorResumeNext.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -77,7 +78,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -97,7 +98,7 @@ namespace System.Linq
                                     break; // while -- done, nothing else to do
                                 }
 
-                                enumerator = sourcesEnumerator.Current.GetAsyncEnumerator();
+                                enumerator = sourcesEnumerator.Current.GetAsyncEnumerator(cancellationToken);
                             }
 
                             try

+ 6 - 3
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Repeat.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -47,8 +48,10 @@ namespace System.Linq
                 return new RepeatElementAsyncIterator<TResult>(element);
             }
 
-            protected override ValueTask<bool> MoveNextCore()
+            protected override ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
+                cancellationToken.ThrowIfCancellationRequested();
+
                 current = element;
                 return TaskExt.True;
             }
@@ -89,7 +92,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -104,7 +107,7 @@ namespace System.Linq
                         if (!isInfinite && currentCount-- == 0)
                             break;
 
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
 
                         goto case AsyncIteratorState.Iterating;

+ 9 - 8
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Scan.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -86,12 +87,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         hasSeed = false;
                         accumulated = default(TSource);
 
@@ -160,12 +161,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         accumulated = seed;
 
                         state = AsyncIteratorState.Iterating;
@@ -224,12 +225,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         hasSeed = false;
                         accumulated = default(TSource);
 
@@ -298,12 +299,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         accumulated = seed;
 
                         state = AsyncIteratorState.Iterating;

+ 2 - 2
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Timeout.cs

@@ -54,12 +54,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;

+ 9 - 6
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Using.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -70,12 +71,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = enumerable.GetAsyncEnumerator();
+                        enumerator = enumerable.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -93,12 +94,14 @@ namespace System.Linq
                 return false;
             }
 
-            protected override void OnGetEnumerator()
+            protected override void OnGetEnumerator(CancellationToken cancellationToken)
             {
+                // REVIEW: Wire cancellation to the functions.
+
                 resource = resourceFactory();
                 enumerable = enumerableFactory(resource);
 
-                base.OnGetEnumerator();
+                base.OnGetEnumerator(cancellationToken);
             }
         }
 
@@ -142,7 +145,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -150,7 +153,7 @@ namespace System.Linq
                         resource = await resourceFactory().ConfigureAwait(false);
                         enumerable = await enumerableFactory(resource).ConfigureAwait(false);
 
-                        enumerator = enumerable.GetAsyncEnumerator();
+                        enumerator = enumerable.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 

+ 3 - 2
Ix.NET/Source/System.Linq.Async.Queryable/System/Linq/AsyncEnumerableQuery.cs

@@ -144,8 +144,9 @@ namespace System.Linq
         /// <summary>
         /// Gets an enumerator to enumerate the elements in the sequence.
         /// </summary>
+        /// <param name="token">Cancellation token used to cancel the enumeration.</param>
         /// <returns>A new enumerator instance used to enumerate the elements in the sequence.</returns>
-        public IAsyncEnumerator<T> GetAsyncEnumerator()
+        public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token)
         {
             if (_enumerable == null)
             {
@@ -153,7 +154,7 @@ namespace System.Linq
                 _enumerable = expression.Compile()();
             }
 
-            return _enumerable.GetAsyncEnumerator();
+            return _enumerable.GetAsyncEnumerator(token);
         }
 
         /// <summary>

+ 4 - 1
Ix.NET/Source/System.Linq.Async/System/Collections/Generic/IAsyncEnumerable.cs

@@ -5,6 +5,8 @@
 // See https://github.com/dotnet/csharplang/blob/master/proposals/async-streams.md for the definition of this interface
 // and the design rationale. (8/30/2017)
 
+using System.Threading;
+
 namespace System.Collections.Generic
 {
     /// <summary>
@@ -16,7 +18,8 @@ namespace System.Collections.Generic
         /// <summary>
         /// Gets an asynchronous enumerator over the sequence.
         /// </summary>
+        /// <param name="cancellationToken">Cancellation token used to cancel the enumeration.</param>
         /// <returns>Enumerator for asynchronous enumeration over the sequence.</returns>
-        IAsyncEnumerator<T> GetAsyncEnumerator();
+        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default);
     }
 }

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerableHelpers.cs

@@ -42,18 +42,18 @@ namespace System.Collections.Generic
             }
             else
             {
-                var en = source.GetAsyncEnumerator();
+                var en = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    if (await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    if (await en.MoveNextAsync().ConfigureAwait(false))
                     {
                         const int DefaultCapacity = 4;
                         var arr = new T[DefaultCapacity];
                         arr[0] = en.Current;
                         var count = 1;
 
-                        while (await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                        while (await en.MoveNextAsync().ConfigureAwait(false))
                         {
                             if (count == arr.Length)
                             {

+ 16 - 16
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerablePartition.cs

@@ -76,7 +76,7 @@ namespace System.Linq
                 return Math.Max(await _source.Count(cancellationToken).ConfigureAwait(false) - _minIndexInclusive, 0);
             }
 
-            var en = _source.GetAsyncEnumerator();
+            var en = _source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
@@ -102,12 +102,12 @@ namespace System.Linq
         private bool hasSkipped;
         private int taken;
 
-        protected override async ValueTask<bool> MoveNextCore()
+        protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
         {
             switch (state)
             {
                 case AsyncIteratorState.Allocated:
-                    _enumerator = _source.GetAsyncEnumerator();
+                    _enumerator = _source.GetAsyncEnumerator(cancellationToken);
                     hasSkipped = false;
                     taken = 0;
 
@@ -217,13 +217,13 @@ namespace System.Linq
             // If the index is negative or >= our max count, return early.
             if (index >= 0 && (!HasLimit || index < Limit))
             {
-                var en = _source.GetAsyncEnumerator();
+                var en = _source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
                     Debug.Assert(_minIndexInclusive + index >= 0, $"Adding {nameof(index)} caused {nameof(_minIndexInclusive)} to overflow.");
 
-                    if (await SkipBeforeAsync(_minIndexInclusive + index, en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    if (await SkipBeforeAsync(_minIndexInclusive + index, en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync().ConfigureAwait(false))
                     {
                         return new Maybe<TSource>(en.Current);
                     }
@@ -239,11 +239,11 @@ namespace System.Linq
 
         public async Task<Maybe<TSource>> TryGetFirstAsync(CancellationToken cancellationToken)
         {
-            var en = _source.GetAsyncEnumerator();
+            var en = _source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync().ConfigureAwait(false))
                 {
                     return new Maybe<TSource>(en.Current);
                 }
@@ -258,11 +258,11 @@ namespace System.Linq
 
         public async Task<Maybe<TSource>> TryGetLastAsync(CancellationToken cancellationToken)
         {
-            var en = _source.GetAsyncEnumerator();
+            var en = _source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync().ConfigureAwait(false))
                 {
                     var remaining = Limit - 1; // Max number of items left, not counting the current element.
                     var comparand = HasLimit ? 0 : int.MinValue; // If we don't have an upper bound, have the comparison always return true.
@@ -288,11 +288,11 @@ namespace System.Linq
 
         public async Task<TSource[]> ToArrayAsync(CancellationToken cancellationToken)
         {
-            var en = _source.GetAsyncEnumerator();
+            var en = _source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync().ConfigureAwait(false))
                 {
                     var remaining = Limit - 1; // Max number of items left, not counting the current element.
                     var comparand = HasLimit ? 0 : int.MinValue; // If we don't have an upper bound, have the comparison always return true.
@@ -305,7 +305,7 @@ namespace System.Linq
                         remaining--;
                         builder.Add(en.Current);
                     }
-                    while (remaining >= comparand && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false));
+                    while (remaining >= comparand && await en.MoveNextAsync().ConfigureAwait(false));
 
                     return builder.ToArray();
                 }
@@ -326,11 +326,11 @@ namespace System.Linq
         {
             var list = new List<TSource>();
 
-            var en = _source.GetAsyncEnumerator();
+            var en = _source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (await SkipBeforeFirstAsync(en, cancellationToken).ConfigureAwait(false) && await en.MoveNextAsync().ConfigureAwait(false))
                 {
                     var remaining = Limit - 1; // Max number of items left, not counting the current element.
                     var comparand = HasLimit ? 0 : int.MinValue; // If we don't have an upper bound, have the comparison always return true.
@@ -340,7 +340,7 @@ namespace System.Linq
                         remaining--;
                         list.Add(en.Current);
                     }
-                    while (remaining >= comparand && await en.MoveNextAsync(cancellationToken).ConfigureAwait(false));
+                    while (remaining >= comparand && await en.MoveNextAsync().ConfigureAwait(false));
                 }
             }
             finally
@@ -371,7 +371,7 @@ namespace System.Linq
 
             for (uint i = 0; i < index; i++)
             {
-                if (!await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await en.MoveNextAsync().ConfigureAwait(false))
                 {
                     return i;
                 }

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncEnumerator.cs

@@ -119,7 +119,7 @@ namespace System.Collections.Generic
                 this.dispose = dispose;
 
                 // Explicit call to initialize enumerator mode
-                GetAsyncEnumerator();
+                GetAsyncEnumerator(default);
             }
 
             public override AsyncIterator<T> Clone()
@@ -139,7 +139,7 @@ namespace System.Collections.Generic
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 7 - 5
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncIterator.cs

@@ -16,23 +16,25 @@ namespace System.Linq
 
         internal TSource current;
         internal AsyncIteratorState state = AsyncIteratorState.New;
+        internal CancellationToken cancellationToken;
 
         protected AsyncIterator()
         {
             threadId = Environment.CurrentManagedThreadId;
         }
 
-        public IAsyncEnumerator<TSource> GetAsyncEnumerator()
+        public IAsyncEnumerator<TSource> GetAsyncEnumerator(CancellationToken cancellationToken)
         {
             var enumerator = state == AsyncIteratorState.New && threadId == Environment.CurrentManagedThreadId
                 ? this
                 : Clone();
 
             enumerator.state = AsyncIteratorState.Allocated;
+            enumerator.cancellationToken = cancellationToken;
 
             try
             {
-                enumerator.OnGetEnumerator();
+                enumerator.OnGetEnumerator(cancellationToken);
             }
             catch
             {
@@ -75,7 +77,7 @@ namespace System.Linq
 
             try
             {
-                var result = await MoveNextCore().ConfigureAwait(false);
+                var result = await MoveNextCore(cancellationToken).ConfigureAwait(false);
 
                 currentIsInvalid = !result; // if move next is false, invalid otherwise valid
 
@@ -111,9 +113,9 @@ namespace System.Linq
             return new AsyncEnumerable.WhereEnumerableAsyncIteratorWithTask<TSource>(this, predicate);
         }
 
-        protected abstract ValueTask<bool> MoveNextCore();
+        protected abstract ValueTask<bool> MoveNextCore(CancellationToken cancellationToken);
 
-        protected virtual void OnGetEnumerator()
+        protected virtual void OnGetEnumerator(CancellationToken cancellationToken)
         {
         }
     }

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/AsyncListPartition.cs

@@ -38,7 +38,7 @@ namespace System.Linq
             return new AsyncListPartition<TSource>(_source, _minIndexInclusive, _maxIndexInclusive);
         }
 
-        protected override async ValueTask<bool> MoveNextCore()
+        protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
         {
             if ((uint)_index <= (uint)(_maxIndexInclusive - _minIndexInclusive) && _index < _source.Count - _minIndexInclusive)
             {

+ 8 - 8
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Aggregate.cs

@@ -142,11 +142,11 @@ namespace System.Linq
         {
             var acc = seed;
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     acc = accumulator(acc, e.Current);
                 }
@@ -164,11 +164,11 @@ namespace System.Linq
             var first = true;
             var acc = default(TSource);
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     acc = first ? e.Current : accumulator(acc, e.Current);
                     first = false;
@@ -189,11 +189,11 @@ namespace System.Linq
         {
             var acc = seed;
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     acc = await accumulator(acc, e.Current).ConfigureAwait(false);
                 }
@@ -211,11 +211,11 @@ namespace System.Linq
             var first = true;
             var acc = default(TSource);
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     acc = first ? e.Current : await accumulator(acc, e.Current).ConfigureAwait(false);
                     first = false;

+ 4 - 4
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/All.cs

@@ -52,11 +52,11 @@ namespace System.Linq
 
         private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     if (!predicate(e.Current))
                         return false;
@@ -72,11 +72,11 @@ namespace System.Linq
 
         private static async Task<bool> AllCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task<bool>> predicate, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     if (!await predicate(e.Current).ConfigureAwait(false))
                         return false;

+ 6 - 6
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Any.cs

@@ -63,11 +63,11 @@ namespace System.Linq
             if (source == null)
                 throw new ArgumentNullException(nameof(source));
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                return await e.MoveNextAsync(cancellationToken).ConfigureAwait(false);
+                return await e.MoveNextAsync().ConfigureAwait(false);
             }
             finally
             {
@@ -77,11 +77,11 @@ namespace System.Linq
 
         private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, bool> predicate, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     if (predicate(e.Current))
                         return true;
@@ -97,11 +97,11 @@ namespace System.Linq
 
         private static async Task<bool> AnyCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, Task<bool>> predicate, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     if (await predicate(e.Current).ConfigureAwait(false))
                         return true;

+ 14 - 14
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/AppendPrepend.cs

@@ -49,10 +49,10 @@ namespace System.Linq
                 this.source = source;
             }
 
-            protected void GetSourceEnumerator()
+            protected void GetSourceEnumerator(CancellationToken cancellationToken)
             {
                 Debug.Assert(enumerator == null);
-                enumerator = source.GetAsyncEnumerator();
+                enumerator = source.GetAsyncEnumerator(cancellationToken);
             }
 
             public abstract AppendPrependAsyncIterator<TSource> Append(TSource item);
@@ -110,7 +110,7 @@ namespace System.Linq
                 return new AppendPrepend1AsyncIterator<TSource>(source, item, appending);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -128,7 +128,7 @@ namespace System.Linq
                     case AsyncIteratorState.Iterating:
                         if (!hasEnumerator)
                         {
-                            GetSourceEnumerator();
+                            GetSourceEnumerator(cancellationToken);
                             hasEnumerator = true;
                         }
 
@@ -204,11 +204,11 @@ namespace System.Linq
                 }
                 else
                 {
-                    var en = source.GetAsyncEnumerator();
+                    var en = source.GetAsyncEnumerator(cancellationToken);
 
                     try
                     {
-                        while (await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                        while (await en.MoveNextAsync().ConfigureAwait(false))
                         {
                             array[index] = en.Current;
                             ++index;
@@ -239,11 +239,11 @@ namespace System.Linq
                 }
 
 
-                var en = source.GetAsyncEnumerator();
+                var en = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    while (await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await en.MoveNextAsync().ConfigureAwait(false))
                     {
                         list.Add(en.Current);
                     }
@@ -315,7 +315,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -340,7 +340,7 @@ namespace System.Linq
                                     return true;
                                 }
 
-                                GetSourceEnumerator();
+                                GetSourceEnumerator(cancellationToken);
                                 mode = 3;
                                 goto case 3;
 
@@ -410,11 +410,11 @@ namespace System.Linq
                 }
                 else
                 {
-                    var en = source.GetAsyncEnumerator();
+                    var en = source.GetAsyncEnumerator(cancellationToken);
 
                     try
                     {
-                        while (await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                        while (await en.MoveNextAsync().ConfigureAwait(false))
                         {
                             array[index] = en.Current;
                             ++index;
@@ -445,11 +445,11 @@ namespace System.Linq
                     list.Add(n.Item);
                 }
 
-                var en = source.GetAsyncEnumerator();
+                var en = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    while (await en.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await en.MoveNextAsync().ConfigureAwait(false))
                     {
                         list.Add(en.Current);
                     }

+ 30 - 30
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Average.cs

@@ -12,11 +12,11 @@ namespace System.Linq
     {
         private static async Task<double> AverageCore(this IAsyncEnumerable<int> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
                 }
@@ -25,7 +25,7 @@ namespace System.Linq
                 long count = 1;
                 checked
                 {
-                    while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         sum += e.Current;
                         ++count;
@@ -42,11 +42,11 @@ namespace System.Linq
 
         private static async Task<double?> AverageCore(IAsyncEnumerable<int?> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var v = e.Current;
                     if (v.HasValue)
@@ -55,7 +55,7 @@ namespace System.Linq
                         long count = 1;
                         checked
                         {
-                            while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                            while (await e.MoveNextAsync().ConfigureAwait(false))
                             {
                                 v = e.Current;
                                 if (v.HasValue)
@@ -80,11 +80,11 @@ namespace System.Linq
 
         private static async Task<double> AverageCore(IAsyncEnumerable<long> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
                 }
@@ -93,7 +93,7 @@ namespace System.Linq
                 long count = 1;
                 checked
                 {
-                    while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         sum += e.Current;
                         ++count;
@@ -110,11 +110,11 @@ namespace System.Linq
 
         private static async Task<double?> AverageCore(IAsyncEnumerable<long?> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var v = e.Current;
                     if (v.HasValue)
@@ -123,7 +123,7 @@ namespace System.Linq
                         long count = 1;
                         checked
                         {
-                            while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                            while (await e.MoveNextAsync().ConfigureAwait(false))
                             {
                                 v = e.Current;
                                 if (v.HasValue)
@@ -148,18 +148,18 @@ namespace System.Linq
 
         private static async Task<double> AverageCore(IAsyncEnumerable<double> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
                 }
 
                 var sum = e.Current;
                 long count = 1;
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     // There is an opportunity to short-circuit here, in that if e.Current is
                     // ever NaN then the result will always be NaN. Assuming that this case is
@@ -178,11 +178,11 @@ namespace System.Linq
 
         private static async Task<double?> AverageCore(IAsyncEnumerable<double?> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var v = e.Current;
                     if (v.HasValue)
@@ -191,7 +191,7 @@ namespace System.Linq
                         long count = 1;
                         checked
                         {
-                            while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                            while (await e.MoveNextAsync().ConfigureAwait(false))
                             {
                                 v = e.Current;
                                 if (v.HasValue)
@@ -216,18 +216,18 @@ namespace System.Linq
 
         private static async Task<float> AverageCore(IAsyncEnumerable<float> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
                 }
 
                 double sum = e.Current;
                 long count = 1;
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     sum += e.Current;
                     ++count;
@@ -243,11 +243,11 @@ namespace System.Linq
 
         private static async Task<float?> AverageCore(IAsyncEnumerable<float?> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var v = e.Current;
                     if (v.HasValue)
@@ -256,7 +256,7 @@ namespace System.Linq
                         long count = 1;
                         checked
                         {
-                            while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                            while (await e.MoveNextAsync().ConfigureAwait(false))
                             {
                                 v = e.Current;
                                 if (v.HasValue)
@@ -281,18 +281,18 @@ namespace System.Linq
 
         private static async Task<decimal> AverageCore(IAsyncEnumerable<decimal> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
                 }
 
                 var sum = e.Current;
                 long count = 1;
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     sum += e.Current;
                     ++count;
@@ -308,18 +308,18 @@ namespace System.Linq
 
         private static async Task<decimal?> AverageCore(IAsyncEnumerable<decimal?> source, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var v = e.Current;
                     if (v.HasValue)
                     {
                         var sum = v.GetValueOrDefault();
                         long count = 1;
-                        while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                        while (await e.MoveNextAsync().ConfigureAwait(false))
                         {
                             v = e.Current;
                             if (v.HasValue)

+ 5 - 5
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Concat.cs

@@ -83,11 +83,11 @@ namespace System.Linq
                         break;
                     }
 
-                    var e = source.GetAsyncEnumerator();
+                    var e = source.GetAsyncEnumerator(cancellationToken);
 
                     try
                     {
-                        while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                        while (await e.MoveNextAsync().ConfigureAwait(false))
                         {
                             list.Add(e.Current);
                         }
@@ -137,11 +137,11 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 if (state == AsyncIteratorState.Allocated)
                 {
-                    enumerator = GetAsyncEnumerable(0).GetAsyncEnumerator();
+                    enumerator = GetAsyncEnumerable(0).GetAsyncEnumerator(cancellationToken);
                     state = AsyncIteratorState.Iterating;
                     counter = 2;
                 }
@@ -161,7 +161,7 @@ namespace System.Linq
                         if (next != null)
                         {
                             await enumerator.DisposeAsync().ConfigureAwait(false);
-                            enumerator = next.GetAsyncEnumerator();
+                            enumerator = next.GetAsyncEnumerator(cancellationToken);
                             continue;
                         }
 

+ 28 - 10
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Create.cs

@@ -12,6 +12,14 @@ namespace System.Linq
     public static partial class AsyncEnumerable
     {
         public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<IAsyncEnumerator<T>> getEnumerator)
+        {
+            if (getEnumerator == null)
+                throw new ArgumentNullException(nameof(getEnumerator));
+
+            return new AnonymousAsyncEnumerable<T>(_ => getEnumerator());
+        }
+
+        public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator)
         {
             if (getEnumerator == null)
                 throw new ArgumentNullException(nameof(getEnumerator));
@@ -20,6 +28,14 @@ namespace System.Linq
         }
 
         public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<Task<IAsyncEnumerator<T>>> getEnumerator)
+        {
+            if (getEnumerator == null)
+                throw new ArgumentNullException(nameof(getEnumerator));
+
+            return new AnonymousAsyncEnumerableWithTask<T>(_ => getEnumerator());
+        }
+
+        public static IAsyncEnumerable<T> CreateEnumerable<T>(Func<CancellationToken, Task<IAsyncEnumerator<T>>> getEnumerator)
         {
             if (getEnumerator == null)
                 throw new ArgumentNullException(nameof(getEnumerator));
@@ -39,41 +55,43 @@ namespace System.Linq
 
         private sealed class AnonymousAsyncEnumerable<T> : IAsyncEnumerable<T>
         {
-            private readonly Func<IAsyncEnumerator<T>> getEnumerator;
+            private readonly Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator;
 
-            public AnonymousAsyncEnumerable(Func<IAsyncEnumerator<T>> getEnumerator)
+            public AnonymousAsyncEnumerable(Func<CancellationToken, IAsyncEnumerator<T>> getEnumerator)
             {
                 Debug.Assert(getEnumerator != null);
 
                 this.getEnumerator = getEnumerator;
             }
 
-            public IAsyncEnumerator<T> GetAsyncEnumerator() => getEnumerator();
+            public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) => getEnumerator(cancellationToken);
         }
 
         private sealed class AnonymousAsyncEnumerableWithTask<T> : IAsyncEnumerable<T>
         {
-            private readonly Func<Task<IAsyncEnumerator<T>>> getEnumerator;
+            private readonly Func<CancellationToken, Task<IAsyncEnumerator<T>>> getEnumerator;
 
-            public AnonymousAsyncEnumerableWithTask(Func<Task<IAsyncEnumerator<T>>> getEnumerator)
+            public AnonymousAsyncEnumerableWithTask(Func<CancellationToken, Task<IAsyncEnumerator<T>>> getEnumerator)
             {
                 Debug.Assert(getEnumerator != null);
 
                 this.getEnumerator = getEnumerator;
             }
 
-            public IAsyncEnumerator<T> GetAsyncEnumerator() => new Enumerator(getEnumerator);
+            public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken) => new Enumerator(getEnumerator, cancellationToken);
 
             private sealed class Enumerator : IAsyncEnumerator<T>
             {
-                private Func<Task<IAsyncEnumerator<T>>> getEnumerator;
+                private Func<CancellationToken, Task<IAsyncEnumerator<T>>> getEnumerator;
+                private readonly CancellationToken cancellationToken;
                 private IAsyncEnumerator<T> enumerator;
 
-                public Enumerator(Func<Task<IAsyncEnumerator<T>>> getEnumerator)
+                public Enumerator(Func<CancellationToken, Task<IAsyncEnumerator<T>>> getEnumerator, CancellationToken cancellationToken)
                 {
                     Debug.Assert(getEnumerator != null);
 
                     this.getEnumerator = getEnumerator;
+                    this.cancellationToken = cancellationToken;
                 }
 
                 public T Current
@@ -111,11 +129,11 @@ namespace System.Linq
                 {
                     try
                     {
-                        enumerator = await getEnumerator().ConfigureAwait(false);
+                        enumerator = await getEnumerator(cancellationToken).ConfigureAwait(false);
                     }
                     catch (Exception ex)
                     {
-                        enumerator = Throw<T>(ex).GetAsyncEnumerator();
+                        enumerator = Throw<T>(ex).GetAsyncEnumerator(cancellationToken);
                         throw;
                     }
                     finally

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/DefaultIfEmpty.cs

@@ -63,12 +63,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         if (await enumerator.MoveNextAsync().ConfigureAwait(false))
                         {
                             current = enumerator.Current;

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Distinct.cs

@@ -79,12 +79,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         if (!await enumerator.MoveNextAsync().ConfigureAwait(false))
                         {
                             await DisposeAsync().ConfigureAwait(false);
@@ -121,7 +121,7 @@ namespace System.Linq
             {
                 var s = new Set<TSource>(comparer);
 
-                await s.UnionWithAsync(source);
+                await s.UnionWithAsync(source, cancellationToken);
 
                 return s;
             }

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ElementAt.cs

@@ -52,11 +52,11 @@ namespace System.Linq
             }
             else
             {
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         if (index == 0)
                         {

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ElementAtOrDefault.cs

@@ -48,11 +48,11 @@ namespace System.Linq
             }
             else
             {
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         if (index == 0)
                         {

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Empty.cs

@@ -42,7 +42,7 @@ namespace System.Linq
 
             public ValueTask<bool> MoveNextAsync() => TaskExt.False;
 
-            public IAsyncEnumerator<TValue> GetAsyncEnumerator() => this;
+            public IAsyncEnumerator<TValue> GetAsyncEnumerator(CancellationToken cancellationToken) => this;
 
             public ValueTask DisposeAsync() => TaskExt.CompletedTask;
         }

+ 3 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Except.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -74,12 +75,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        firstEnumerator = first.GetAsyncEnumerator();
+                        firstEnumerator = first.GetAsyncEnumerator(cancellationToken);
                         set = new Set<TSource>(comparer);
                         setFilled = false;
                         fillSetTask = FillSetAsync();

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/First.cs

@@ -86,11 +86,11 @@ namespace System.Linq
             }
             else
             {
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    if (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    if (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         return e.Current;
                     }

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/FirstOrDefault.cs

@@ -86,11 +86,11 @@ namespace System.Linq
             }
             else
             {
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    if (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    if (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         return e.Current;
                     }

+ 4 - 4
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ForEach.cs

@@ -154,11 +154,11 @@ namespace System.Linq
         {
             var index = 0;
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     action(e.Current, checked(index++));
                 }
@@ -173,11 +173,11 @@ namespace System.Linq
         {
             var index = 0;
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     await action(e.Current, checked(index++), cancellationToken).ConfigureAwait(false);
                 }

+ 7 - 7
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/GroupBy.cs

@@ -259,7 +259,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -348,13 +348,13 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
                         lookup = await Internal.LookupWithTask<TKey, TSource>.CreateAsync(source, keySelector, comparer).ConfigureAwait(false);
-                        enumerator = lookup.Select(async g => await resultSelector(g.Key, g).ConfigureAwait(false)).GetAsyncEnumerator();
+                        enumerator = lookup.Select(async g => await resultSelector(g.Key, g).ConfigureAwait(false)).GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -437,7 +437,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -526,7 +526,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -611,7 +611,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -696,7 +696,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 19 - 10
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/GroupJoin.cs

@@ -3,6 +3,7 @@
 // See the LICENSE file in the project root for more information. 
 
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -102,14 +103,15 @@ namespace System.Linq
                 _comparer = comparer;
             }
 
-            public IAsyncEnumerator<TResult> GetAsyncEnumerator()
+            public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
                 => new GroupJoinAsyncEnumerator(
-                    _outer.GetAsyncEnumerator(),
+                    _outer.GetAsyncEnumerator(cancellationToken),
                     _inner,
                     _outerKeySelector,
                     _innerKeySelector,
                     _resultSelector,
-                    _comparer);
+                    _comparer,
+                    cancellationToken);
 
             private sealed class GroupJoinAsyncEnumerator : IAsyncEnumerator<TResult>
             {
@@ -119,6 +121,7 @@ namespace System.Linq
                 private readonly IAsyncEnumerator<TOuter> _outer;
                 private readonly Func<TOuter, TKey> _outerKeySelector;
                 private readonly Func<TOuter, IAsyncEnumerable<TInner>, TResult> _resultSelector;
+                private readonly CancellationToken _cancellationToken;
 
                 private Internal.Lookup<TKey, TInner> _lookup;
 
@@ -128,7 +131,8 @@ namespace System.Linq
                     Func<TOuter, TKey> outerKeySelector,
                     Func<TInner, TKey> innerKeySelector,
                     Func<TOuter, IAsyncEnumerable<TInner>, TResult> resultSelector,
-                    IEqualityComparer<TKey> comparer)
+                    IEqualityComparer<TKey> comparer,
+                    CancellationToken cancellationToken)
                 {
                     _outer = outer;
                     _inner = inner;
@@ -136,6 +140,7 @@ namespace System.Linq
                     _innerKeySelector = innerKeySelector;
                     _resultSelector = resultSelector;
                     _comparer = comparer;
+                    _cancellationToken = cancellationToken;
                 }
 
                 public async ValueTask<bool> MoveNextAsync()
@@ -148,7 +153,7 @@ namespace System.Linq
 
                     if (_lookup == null)
                     {
-                        _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer).ConfigureAwait(false);
+                        _lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
                     }
 
                     var item = _outer.Current;
@@ -192,14 +197,15 @@ namespace System.Linq
                 _comparer = comparer;
             }
 
-            public IAsyncEnumerator<TResult> GetAsyncEnumerator()
+            public IAsyncEnumerator<TResult> GetAsyncEnumerator(CancellationToken cancellationToken)
                 => new GroupJoinAsyncEnumeratorWithTask(
-                    _outer.GetAsyncEnumerator(),
+                    _outer.GetAsyncEnumerator(cancellationToken),
                     _inner,
                     _outerKeySelector,
                     _innerKeySelector,
                     _resultSelector,
-                    _comparer);
+                    _comparer,
+                    cancellationToken);
 
             private sealed class GroupJoinAsyncEnumeratorWithTask : IAsyncEnumerator<TResult>
             {
@@ -209,6 +215,7 @@ namespace System.Linq
                 private readonly IAsyncEnumerator<TOuter> _outer;
                 private readonly Func<TOuter, Task<TKey>> _outerKeySelector;
                 private readonly Func<TOuter, IAsyncEnumerable<TInner>, Task<TResult>> _resultSelector;
+                private readonly CancellationToken _cancellationToken;
 
                 private Internal.LookupWithTask<TKey, TInner> _lookup;
 
@@ -218,7 +225,8 @@ namespace System.Linq
                     Func<TOuter, Task<TKey>> outerKeySelector,
                     Func<TInner, Task<TKey>> innerKeySelector,
                     Func<TOuter, IAsyncEnumerable<TInner>, Task<TResult>> resultSelector,
-                    IEqualityComparer<TKey> comparer)
+                    IEqualityComparer<TKey> comparer,
+                    CancellationToken cancellationToken)
                 {
                     _outer = outer;
                     _inner = inner;
@@ -226,6 +234,7 @@ namespace System.Linq
                     _innerKeySelector = innerKeySelector;
                     _resultSelector = resultSelector;
                     _comparer = comparer;
+                    _cancellationToken = cancellationToken;
                 }
 
                 public async ValueTask<bool> MoveNextAsync()
@@ -238,7 +247,7 @@ namespace System.Linq
 
                     if (_lookup == null)
                     {
-                        _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer).ConfigureAwait(false);
+                        _lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(_inner, _innerKeySelector, _comparer, _cancellationToken).ConfigureAwait(false);
                     }
 
                     var item = _outer.Current;

+ 2 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Grouping.cs

@@ -4,6 +4,7 @@
 
 using System.Collections;
 using System.Collections.Generic;
+using System.Threading;
 
 // Note: The type here has to be internal as System.Linq has it's own public copy we're not using
 
@@ -103,6 +104,6 @@ namespace System.Linq.Internal
             }
         }
 
-        IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator() => this.ToAsyncEnumerable().GetAsyncEnumerator();
+        IAsyncEnumerator<TElement> IAsyncEnumerable<TElement>.GetAsyncEnumerator(CancellationToken cancellationToken) => this.ToAsyncEnumerable().GetAsyncEnumerator(cancellationToken);
     }
 }

+ 3 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Intersect.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -74,12 +75,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        firstEnumerator = first.GetAsyncEnumerator();
+                        firstEnumerator = first.GetAsyncEnumerator(cancellationToken);
                         set = new Set<TSource>(comparer);
                         setFilled = false;
                         fillSetTask = FillSet();

+ 7 - 6
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Join.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -135,12 +136,12 @@ namespace System.Linq
             private const int State_For = 3;
             private const int State_While = 4;
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        outerEnumerator = outer.GetAsyncEnumerator();
+                        outerEnumerator = outer.GetAsyncEnumerator(cancellationToken);
                         mode = State_If;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -151,7 +152,7 @@ namespace System.Linq
                             case State_If:
                                 if (await outerEnumerator.MoveNextAsync().ConfigureAwait(false))
                                 {
-                                    lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer).ConfigureAwait(false);
+                                    lookup = await Internal.Lookup<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
 
                                     if (lookup.Count != 0)
                                     {
@@ -263,12 +264,12 @@ namespace System.Linq
             private const int State_For = 3;
             private const int State_While = 4;
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        outerEnumerator = outer.GetAsyncEnumerator();
+                        outerEnumerator = outer.GetAsyncEnumerator(cancellationToken);
                         mode = State_If;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -279,7 +280,7 @@ namespace System.Linq
                             case State_If:
                                 if (await outerEnumerator.MoveNextAsync().ConfigureAwait(false))
                                 {
-                                    lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer).ConfigureAwait(false);
+                                    lookup = await Internal.LookupWithTask<TKey, TInner>.CreateForJoinAsync(inner, innerKeySelector, comparer, cancellationToken).ConfigureAwait(false);
 
                                     if (lookup.Count != 0)
                                     {

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Last.cs

@@ -90,11 +90,11 @@ namespace System.Linq
                 var last = default(TSource);
                 var hasLast = false;
 
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         hasLast = true;
                         last = e.Current;

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/LastOrDefault.cs

@@ -90,11 +90,11 @@ namespace System.Linq
                 var last = default(TSource);
                 var hasLast = false;
 
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {
-                    while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                    while (await e.MoveNextAsync().ConfigureAwait(false))
                     {
                         hasLast = true;
                         last = e.Current;

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Max.cs

@@ -69,16 +69,16 @@ namespace System.Linq
 
         private static async Task<TSource> MaxCore<TSource>(IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
 
                 var max = e.Current;
 
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var cur = e.Current;
 

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Min.cs

@@ -69,16 +69,16 @@ namespace System.Linq
 
         private static async Task<TSource> MinCore<TSource>(IAsyncEnumerable<TSource> source, IComparer<TSource> comparer, CancellationToken cancellationToken)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
 
                 var min = e.Current;
 
-                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     var cur = e.Current;
 

+ 14 - 13
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/OrderedAsyncEnumerable.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -23,7 +24,7 @@ namespace System.Linq
             return new OrderedAsyncEnumerableWithTask<TElement, TKey>(source, keySelector, comparer, descending, this);
         }
 
-        internal abstract Task Initialize();
+        internal abstract Task Initialize(CancellationToken cancellationToken);
     }
 
     internal sealed class OrderedAsyncEnumerable<TElement, TKey> : OrderedAsyncEnumerable<TElement>
@@ -71,13 +72,13 @@ namespace System.Linq
             await base.DisposeAsync().ConfigureAwait(false);
         }
 
-        protected override async ValueTask<bool> MoveNextCore()
+        protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
         {
             switch (state)
             {
                 case AsyncIteratorState.Allocated:
 
-                    await Initialize().ConfigureAwait(false);
+                    await Initialize(cancellationToken).ConfigureAwait(false);
 
                     enumerator = enumerable.GetEnumerator();
                     state = AsyncIteratorState.Iterating;
@@ -97,17 +98,17 @@ namespace System.Linq
             return false;
         }
 
-        internal override async Task Initialize()
+        internal override async Task Initialize(CancellationToken cancellationToken)
         {
             if (parent == null)
             {
-                var buffer = await source.ToList().ConfigureAwait(false);
+                var buffer = await source.ToList(cancellationToken).ConfigureAwait(false);
                 enumerable = (!@descending ? buffer.OrderBy(keySelector, comparer) : buffer.OrderByDescending(keySelector, comparer));
             }
             else
             {
-                parentEnumerator = parent.GetAsyncEnumerator();
-                await parent.Initialize().ConfigureAwait(false);
+                parentEnumerator = parent.GetAsyncEnumerator(cancellationToken);
+                await parent.Initialize(cancellationToken).ConfigureAwait(false);
                 enumerable = parent.enumerable.CreateOrderedEnumerable(keySelector, comparer, @descending);
             }
         }
@@ -158,13 +159,13 @@ namespace System.Linq
             await base.DisposeAsync().ConfigureAwait(false);
         }
 
-        protected override async ValueTask<bool> MoveNextCore()
+        protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
         {
             switch (state)
             {
                 case AsyncIteratorState.Allocated:
 
-                    await Initialize().ConfigureAwait(false);
+                    await Initialize(cancellationToken).ConfigureAwait(false);
 
                     enumerator = enumerable.GetEnumerator();
                     state = AsyncIteratorState.Iterating;
@@ -184,17 +185,17 @@ namespace System.Linq
             return false;
         }
 
-        internal override async Task Initialize()
+        internal override async Task Initialize(CancellationToken cancellationToken)
         {
             if (parent == null)
             {
-                var buffer = await source.ToList().ConfigureAwait(false);
+                var buffer = await source.ToList(cancellationToken).ConfigureAwait(false);
                 enumerable = (!@descending ? buffer.OrderByAsync(keySelector, comparer) : buffer.OrderByDescendingAsync(keySelector, comparer));
             }
             else
             {
-                parentEnumerator = parent.GetAsyncEnumerator();
-                await parent.Initialize().ConfigureAwait(false);
+                parentEnumerator = parent.GetAsyncEnumerator(cancellationToken);
+                await parent.Initialize(cancellationToken).ConfigureAwait(false);
                 enumerable = parent.enumerable.CreateOrderedEnumerableAsync(keySelector, comparer, @descending);
             }
         }

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Range.cs

@@ -107,7 +107,7 @@ namespace System.Linq
 
             public Task<Maybe<int>> TryGetLastAsync(CancellationToken cancellationToken) => Task.FromResult(new Maybe<int>(end - 1));
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Repeat.cs

@@ -58,7 +58,7 @@ namespace System.Linq
                 return Task.FromResult(res);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Reverse.cs

@@ -89,7 +89,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 10 - 10
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Select.cs

@@ -118,12 +118,12 @@ namespace System.Linq
                 return new SelectEnumerableAsyncIterator<TSource, TResult1>(source, CombineSelectors(this.selector, selector));
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -174,12 +174,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -289,7 +289,7 @@ namespace System.Linq
                 return Task.FromResult(res);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -350,12 +350,12 @@ namespace System.Linq
                 return new SelectEnumerableAsyncIteratorWithTask<TSource, TResult1>(source, CombineSelectors(this.selector, selector));
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -406,12 +406,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -526,7 +526,7 @@ namespace System.Linq
                 return res;
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 25 - 24
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SelectMany.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -141,12 +142,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -163,7 +164,7 @@ namespace System.Linq
                                     }
 
                                     var inner = selector(sourceEnumerator.Current);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -232,12 +233,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -254,7 +255,7 @@ namespace System.Linq
                                     }
 
                                     var inner = await selector(sourceEnumerator.Current).ConfigureAwait(false);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -329,12 +330,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -352,7 +353,7 @@ namespace System.Linq
 
                                     currentSource = sourceEnumerator.Current;
                                     var inner = collectionSelector(currentSource);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -427,12 +428,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -450,7 +451,7 @@ namespace System.Linq
 
                                     currentSource = sourceEnumerator.Current;
                                     var inner = await collectionSelector(currentSource).ConfigureAwait(false);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -526,12 +527,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
@@ -556,7 +557,7 @@ namespace System.Linq
                                     }
 
                                     var inner = collectionSelector(currentSource, index);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -632,12 +633,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
@@ -662,7 +663,7 @@ namespace System.Linq
                                     }
 
                                     var inner = await collectionSelector(currentSource, index).ConfigureAwait(false);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -732,12 +733,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
@@ -760,7 +761,7 @@ namespace System.Linq
                                     }
 
                                     var inner = selector(sourceEnumerator.Current, index);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;
@@ -830,12 +831,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        sourceEnumerator = source.GetAsyncEnumerator();
+                        sourceEnumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         mode = State_Source;
                         state = AsyncIteratorState.Iterating;
@@ -858,7 +859,7 @@ namespace System.Linq
                                     }
 
                                     var inner = await selector(sourceEnumerator.Current, index).ConfigureAwait(false);
-                                    resultEnumerator = inner.GetAsyncEnumerator();
+                                    resultEnumerator = inner.GetAsyncEnumerator(cancellationToken);
 
                                     mode = State_Result;
                                     goto case State_Result;

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SequenceEqual.cs

@@ -88,11 +88,11 @@ namespace System.Linq
                 return false;
             }
 
-            var e1 = first.GetAsyncEnumerator();
+            var e1 = first.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                var e2 = second.GetAsyncEnumerator();
+                var e2 = second.GetAsyncEnumerator(cancellationToken);
 
                 try
                 {

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Single.cs

@@ -79,17 +79,17 @@ namespace System.Linq
                 throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT);
             }
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.NO_ELEMENTS);
                 }
 
                 var result = e.Current;
-                if (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT);
                 }

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SingleOrDefault.cs

@@ -79,17 +79,17 @@ namespace System.Linq
                 throw new InvalidOperationException(Strings.MORE_THAN_ONE_ELEMENT);
             }
 
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     return default(TSource);
                 }
 
                 var result = e.Current;
-                if (!await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
                 {
                     return result;
                 }

+ 3 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SkipLast.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -65,12 +66,12 @@ namespace System.Linq
             }
 
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         queue = new Queue<TSource>();
 
                         state = AsyncIteratorState.Iterating;

+ 9 - 8
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/SkipWhile.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -83,12 +84,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         // skip elements as requested
                         while (await enumerator.MoveNextAsync().ConfigureAwait(false))
@@ -160,12 +161,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
 
                         // skip elements as requested
@@ -242,12 +243,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         // skip elements as requested
                         while (await enumerator.MoveNextAsync().ConfigureAwait(false))
@@ -319,12 +320,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
 
                         // skip elements as requested

+ 3 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/TakeLast.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -58,12 +59,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         queue = new Queue<TSource>();
                         isDone = false;
 

+ 9 - 8
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/TakeWhile.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -82,12 +83,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -147,12 +148,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -216,12 +217,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -281,12 +282,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;

+ 3 - 3
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToAsyncEnumerable.cs

@@ -147,7 +147,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -216,7 +216,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -319,7 +319,7 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToEnumerable.cs

@@ -18,7 +18,7 @@ namespace System.Linq
 
         private static IEnumerable<TSource> ToEnumerableCore<TSource>(IAsyncEnumerable<TSource> source)
         {
-            var e = source.GetAsyncEnumerator();
+            var e = source.GetAsyncEnumerator(default);
 
             try
             {

+ 16 - 16
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToLookup.cs

@@ -293,11 +293,11 @@ namespace System.Linq.Internal
 
             var lookup = new Lookup<TKey, TElement>(comparer);
 
-            var enu = source.GetAsyncEnumerator();
+            var enu = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await enu.MoveNextAsync().ConfigureAwait(false))
                 {
                     var key = keySelector(enu.Current);
                     var group = lookup.GetGrouping(key, create: true);
@@ -321,11 +321,11 @@ namespace System.Linq.Internal
 
             var lookup = new Lookup<TKey, TElement>(comparer);
 
-            var enu = source.GetAsyncEnumerator();
+            var enu = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await enu.MoveNextAsync().ConfigureAwait(false))
                 {
                     var key = keySelector(enu.Current);
                     lookup.GetGrouping(key, create: true).Add(enu.Current);
@@ -339,11 +339,11 @@ namespace System.Linq.Internal
             return lookup;
         }
 
-        internal static async Task<Lookup<TKey, TElement>> CreateForJoinAsync(IAsyncEnumerable<TElement> source, Func<TElement, TKey> keySelector, IEqualityComparer<TKey> comparer)
+        internal static async Task<Lookup<TKey, TElement>> CreateForJoinAsync(IAsyncEnumerable<TElement> source, Func<TElement, TKey> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
         {
             var lookup = new Lookup<TKey, TElement>(comparer);
 
-            var enu = source.GetAsyncEnumerator();
+            var enu = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
@@ -474,9 +474,9 @@ namespace System.Linq.Internal
             return Task.FromResult(Count);
         }
 
-        IAsyncEnumerator<IAsyncGrouping<TKey, TElement>> IAsyncEnumerable<IAsyncGrouping<TKey, TElement>>.GetAsyncEnumerator()
+        IAsyncEnumerator<IAsyncGrouping<TKey, TElement>> IAsyncEnumerable<IAsyncGrouping<TKey, TElement>>.GetAsyncEnumerator(CancellationToken cancellationToken)
         {
-            return Enumerable.Cast<IAsyncGrouping<TKey, TElement>>(this).ToAsyncEnumerable().GetAsyncEnumerator();
+            return Enumerable.Cast<IAsyncGrouping<TKey, TElement>>(this).ToAsyncEnumerable().GetAsyncEnumerator(cancellationToken);
         }
 
         Task<List<IAsyncGrouping<TKey, TElement>>> IAsyncIListProvider<IAsyncGrouping<TKey, TElement>>.ToListAsync(CancellationToken cancellationToken)
@@ -579,11 +579,11 @@ namespace System.Linq.Internal
 
             var lookup = new LookupWithTask<TKey, TElement>(comparer);
 
-            var enu = source.GetAsyncEnumerator();
+            var enu = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await enu.MoveNextAsync().ConfigureAwait(false))
                 {
                     var key = await keySelector(enu.Current).ConfigureAwait(false);
                     var group = lookup.GetGrouping(key, create: true);
@@ -607,11 +607,11 @@ namespace System.Linq.Internal
 
             var lookup = new LookupWithTask<TKey, TElement>(comparer);
 
-            var enu = source.GetAsyncEnumerator();
+            var enu = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
-                while (await enu.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                while (await enu.MoveNextAsync().ConfigureAwait(false))
                 {
                     var key = await keySelector(enu.Current).ConfigureAwait(false);
                     lookup.GetGrouping(key, create: true).Add(enu.Current);
@@ -625,11 +625,11 @@ namespace System.Linq.Internal
             return lookup;
         }
 
-        internal static async Task<LookupWithTask<TKey, TElement>> CreateForJoinAsync(IAsyncEnumerable<TElement> source, Func<TElement, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer)
+        internal static async Task<LookupWithTask<TKey, TElement>> CreateForJoinAsync(IAsyncEnumerable<TElement> source, Func<TElement, Task<TKey>> keySelector, IEqualityComparer<TKey> comparer, CancellationToken cancellationToken)
         {
             var lookup = new LookupWithTask<TKey, TElement>(comparer);
 
-            var enu = source.GetAsyncEnumerator();
+            var enu = source.GetAsyncEnumerator(cancellationToken);
 
             try
             {
@@ -760,9 +760,9 @@ namespace System.Linq.Internal
             return Task.FromResult(Count);
         }
 
-        IAsyncEnumerator<IAsyncGrouping<TKey, TElement>> IAsyncEnumerable<IAsyncGrouping<TKey, TElement>>.GetAsyncEnumerator()
+        IAsyncEnumerator<IAsyncGrouping<TKey, TElement>> IAsyncEnumerable<IAsyncGrouping<TKey, TElement>>.GetAsyncEnumerator(CancellationToken cancellationToken)
         {
-            return Enumerable.Cast<IAsyncGrouping<TKey, TElement>>(this).ToAsyncEnumerable().GetAsyncEnumerator();
+            return Enumerable.Cast<IAsyncGrouping<TKey, TElement>>(this).ToAsyncEnumerable().GetAsyncEnumerator(cancellationToken);
         }
 
         Task<List<IAsyncGrouping<TKey, TElement>>> IAsyncIListProvider<IAsyncGrouping<TKey, TElement>>.ToListAsync(CancellationToken cancellationToken)

+ 1 - 1
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ToObservable.cs

@@ -28,7 +28,7 @@ namespace System.Linq
             public IDisposable Subscribe(IObserver<T> observer)
             {
                 var ctd = new CancellationTokenDisposable();
-                var e = source.GetAsyncEnumerator();
+                var e = source.GetAsyncEnumerator(ctd.Token);
 
                 var f = default(Action);
                 f = () => e.MoveNextAsync().AsTask().ContinueWith(

+ 4 - 4
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Union.cs

@@ -107,7 +107,7 @@ namespace System.Linq
                 return false;
             }
 
-            protected sealed override async ValueTask<bool> MoveNextCore()
+            protected sealed override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
@@ -118,7 +118,7 @@ namespace System.Linq
                         {
                             ++_index;
 
-                            var enumerator = enumerable.GetAsyncEnumerator();
+                            var enumerator = enumerable.GetAsyncEnumerator(cancellationToken);
 
                             if (await enumerator.MoveNextAsync().ConfigureAwait(false))
                             {
@@ -146,7 +146,7 @@ namespace System.Linq
                                 break;
                             }
 
-                            await SetEnumeratorAsync(enumerable.GetAsyncEnumerator()).ConfigureAwait(false);
+                            await SetEnumeratorAsync(enumerable.GetAsyncEnumerator(cancellationToken)).ConfigureAwait(false);
                             ++_index;
                         }
 
@@ -169,7 +169,7 @@ namespace System.Linq
                         return set;
                     }
 
-                    await set.UnionWithAsync(enumerable);
+                    await set.UnionWithAsync(enumerable, cancellationToken);
                 }
             }
 

+ 11 - 10
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Where.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -123,12 +124,12 @@ namespace System.Linq
                 return new WhereEnumerableAsyncIterator<TSource>(source, CombinePredicates(this.predicate, predicate));
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -184,12 +185,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -253,12 +254,12 @@ namespace System.Linq
                 return new WhereEnumerableAsyncIteratorWithTask<TSource>(source, CombinePredicates(this.predicate, predicate));
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 
@@ -314,12 +315,12 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         index = -1;
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -387,12 +388,12 @@ namespace System.Linq
                 return new WhereSelectEnumerableAsyncIterator<TSource, TResult1>(source, predicate, CombineSelectors(this.selector, selector));
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        enumerator = source.GetAsyncEnumerator();
+                        enumerator = source.GetAsyncEnumerator(cancellationToken);
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
 

+ 7 - 6
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/Zip.cs

@@ -4,6 +4,7 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -76,13 +77,13 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        firstEnumerator = first.GetAsyncEnumerator();
-                        secondEnumerator = second.GetAsyncEnumerator();
+                        firstEnumerator = first.GetAsyncEnumerator(cancellationToken);
+                        secondEnumerator = second.GetAsyncEnumerator(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;
@@ -151,13 +152,13 @@ namespace System.Linq
                 await base.DisposeAsync().ConfigureAwait(false);
             }
 
-            protected override async ValueTask<bool> MoveNextCore()
+            protected override async ValueTask<bool> MoveNextCore(CancellationToken cancellationToken)
             {
                 switch (state)
                 {
                     case AsyncIteratorState.Allocated:
-                        firstEnumerator = first.GetAsyncEnumerator();
-                        secondEnumerator = second.GetAsyncEnumerator();
+                        firstEnumerator = first.GetAsyncEnumerator(cancellationToken);
+                        secondEnumerator = second.GetAsyncEnumerator(cancellationToken);
 
                         state = AsyncIteratorState.Iterating;
                         goto case AsyncIteratorState.Iterating;

+ 3 - 2
Ix.NET/Source/System.Linq.Async/System/Linq/Set.cs

@@ -7,6 +7,7 @@
 using System.Collections.Generic;
 using System.Diagnostics;
 using System.Diagnostics.CodeAnalysis;
+using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -144,9 +145,9 @@ namespace System.Linq
             _slots = newSlots;
         }
 
-        public async Task UnionWithAsync(IAsyncEnumerable<TElement> other)
+        public async Task UnionWithAsync(IAsyncEnumerable<TElement> other, CancellationToken cancellationToken)
         {
-            var enu = other.GetAsyncEnumerator();
+            var enu = other.GetAsyncEnumerator(cancellationToken);
 
             try
             {

+ 2 - 2
Ix.NET/Source/System.Linq.Async/System/Threading/Tasks/AsyncEnumerableExtensions.cs

@@ -30,8 +30,8 @@ namespace System.Threading.Tasks
                     _continueOnCapturedContext = continueOnCapturedContext;
                 }
 
-                public ConfiguredAsyncEnumerator GetAsyncEnumerator() =>
-                    new ConfiguredAsyncEnumerator(_enumerable.GetAsyncEnumerator(), _continueOnCapturedContext);
+                public ConfiguredAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) =>
+                    new ConfiguredAsyncEnumerator(_enumerable.GetAsyncEnumerator(cancellationToken), _continueOnCapturedContext);
 
                 public struct ConfiguredAsyncEnumerator
                 {