Ver código fonte

Adding ref assemblies

Bart De Smet 7 anos atrás
pai
commit
c56e02c33a
19 arquivos alterados com 1195 adições e 4 exclusões
  1. 210 0
      Ix.NET/Source/System.Interactive.Async.Providers/System/Linq/AsyncQueryableEx.Generated.cs
  2. 1 1
      Ix.NET/Source/System.Interactive.Async/System.Interactive.Async.csproj
  3. 108 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Catch.cs
  4. 68 1
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Defer.cs
  5. 6 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Distinct.cs
  6. 106 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/DistinctUntilChanged.cs
  7. 140 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs
  8. 104 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs
  9. 2 1
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs
  10. 2 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Generate.cs
  11. 36 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/MaxBy.cs
  12. 78 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/MinBy.cs
  13. 167 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Scan.cs
  14. 83 0
      Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Using.cs
  15. 0 1
      Ix.NET/Source/refs/Directory.build.props
  16. 19 0
      Ix.NET/Source/refs/System.Interactive.Async.Providers.Ref/System.Interactive.Async.Providers.Ref.csproj
  17. 23 0
      Ix.NET/Source/refs/System.Interactive.Async.Ref/System.Interactive.Async.Ref.csproj
  18. 23 0
      Ix.NET/Source/refs/System.Linq.Async.Queryable.Ref/System.Linq.Async.Queryable.Ref.csproj
  19. 19 0
      Ix.NET/Source/refs/System.Linq.Async.Ref/System.Linq.Async.Ref.csproj

+ 210 - 0
Ix.NET/Source/System.Interactive.Async.Providers/System/Linq/AsyncQueryableEx.Generated.cs

@@ -64,6 +64,20 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> Catch<TSource, TException>(this IAsyncQueryable<TSource> source, Expression<Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>>> handler)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (handler == null)
+                throw new ArgumentNullException(nameof(handler));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Catch<TSource, TException>(default(IAsyncQueryable<TSource>), default(Expression<Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>>>))), source.Expression, handler));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TException)), source.Expression, handler));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> Catch<TSource, TException>(this IAsyncQueryable<TSource> source, Expression<Func<TException, IAsyncEnumerable<TSource>>> handler)
         {
             if (source == null)
@@ -190,6 +204,20 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<TKey>>> keySelector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.DistinctUntilChanged<TSource, TKey>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<TKey>>>))), source.Expression, keySelector));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), source.Expression, keySelector));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TKey>> keySelector)
         {
             if (source == null)
@@ -218,6 +246,22 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<TKey>>> keySelector, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.DistinctUntilChanged<TSource, TKey>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<TKey>>>), default(IEqualityComparer<TKey>))), source.Expression, keySelector, Expression.Constant(comparer, typeof(IEqualityComparer<TKey>))));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), source.Expression, keySelector, Expression.Constant(comparer, typeof(IEqualityComparer<TKey>))));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IEqualityComparer<TKey> comparer)
         {
             if (source == null)
@@ -264,6 +308,20 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, Task>> onNext)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Do<TSource>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, Task>>))), source.Expression, onNext));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, onNext));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, Task>> onNext)
         {
             if (source == null)
@@ -324,6 +382,38 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, Task>> onNext, Expression<Func<CancellationToken, Task>> onCompleted)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+            if (onCompleted == null)
+                throw new ArgumentNullException(nameof(onCompleted));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Do<TSource>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, Task>>), default(Expression<Func<CancellationToken, Task>>))), source.Expression, onNext, onCompleted));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, onNext, onCompleted));
+#endif
+        }
+
+        public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, Task>> onNext, Expression<Func<Exception, CancellationToken, Task>> onError)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+            if (onError == null)
+                throw new ArgumentNullException(nameof(onError));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Do<TSource>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, Task>>), default(Expression<Func<Exception, CancellationToken, Task>>))), source.Expression, onNext, onError));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, onNext, onError));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, Task>> onNext, Expression<Func<Exception, Task>> onError)
         {
             if (source == null)
@@ -374,6 +464,24 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, Task>> onNext, Expression<Func<Exception, CancellationToken, Task>> onError, Expression<Func<CancellationToken, Task>> onCompleted)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (onNext == null)
+                throw new ArgumentNullException(nameof(onNext));
+            if (onError == null)
+                throw new ArgumentNullException(nameof(onError));
+            if (onCompleted == null)
+                throw new ArgumentNullException(nameof(onCompleted));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Do<TSource>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, Task>>), default(Expression<Func<Exception, CancellationToken, Task>>), default(Expression<Func<CancellationToken, Task>>))), source.Expression, onNext, onError, onCompleted));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, onNext, onError, onCompleted));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> Do<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, Task>> onNext, Expression<Func<Exception, Task>> onError, Expression<Func<Task>> onCompleted)
         {
             if (source == null)
@@ -392,6 +500,20 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> Expand<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>>> selector)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (selector == null)
+                throw new ArgumentNullException(nameof(selector));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Expand<TSource>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>>>))), source.Expression, selector));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, selector));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> Expand<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, IAsyncEnumerable<TSource>>> selector)
         {
             if (source == null)
@@ -540,6 +662,20 @@ namespace System.Linq
 #endif
         }
 
+        public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<TKey>>> keySelector, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(InfoOf(() => AsyncQueryableEx.MaxByAsync<TSource, TKey>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<TKey>>>), default(CancellationToken))), source.Expression, keySelector, Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#else
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), source.Expression, keySelector, Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#endif
+        }
+
         public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TKey>> keySelector, CancellationToken cancellationToken)
         {
             if (source == null)
@@ -600,6 +736,22 @@ namespace System.Linq
 #endif
         }
 
+        public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<TKey>>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(InfoOf(() => AsyncQueryableEx.MaxByAsync<TSource, TKey>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<TKey>>>), default(IComparer<TKey>), default(CancellationToken))), source.Expression, keySelector, Expression.Constant(comparer, typeof(IComparer<TKey>)), Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#else
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), source.Expression, keySelector, Expression.Constant(comparer, typeof(IComparer<TKey>)), Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#endif
+        }
+
         public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
         {
             if (source == null)
@@ -700,6 +852,20 @@ namespace System.Linq
 #endif
         }
 
+        public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<TKey>>> keySelector, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(InfoOf(() => AsyncQueryableEx.MinByAsync<TSource, TKey>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<TKey>>>), default(CancellationToken))), source.Expression, keySelector, Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#else
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), source.Expression, keySelector, Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#endif
+        }
+
         public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TKey>> keySelector, CancellationToken cancellationToken)
         {
             if (source == null)
@@ -760,6 +926,22 @@ namespace System.Linq
 #endif
         }
 
+        public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, CancellationToken, ValueTask<TKey>>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (keySelector == null)
+                throw new ArgumentNullException(nameof(keySelector));
+            if (comparer == null)
+                throw new ArgumentNullException(nameof(comparer));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(InfoOf(() => AsyncQueryableEx.MinByAsync<TSource, TKey>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, CancellationToken, ValueTask<TKey>>>), default(IComparer<TKey>), default(CancellationToken))), source.Expression, keySelector, Expression.Constant(comparer, typeof(IComparer<TKey>)), Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#else
+            return source.Provider.ExecuteAsync<IList<TSource>>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TKey)), source.Expression, keySelector, Expression.Constant(comparer, typeof(IComparer<TKey>)), Expression.Constant(cancellationToken, typeof(CancellationToken))), cancellationToken);
+#endif
+        }
+
         public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TKey>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
         {
             if (source == null)
@@ -854,6 +1036,20 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TSource> Scan<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TSource, CancellationToken, ValueTask<TSource>>> accumulator)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TSource>(Expression.Call(InfoOf(() => AsyncQueryableEx.Scan<TSource>(default(IAsyncQueryable<TSource>), default(Expression<Func<TSource, TSource, CancellationToken, ValueTask<TSource>>>))), source.Expression, accumulator));
+#else
+            return source.Provider.CreateQuery<TSource>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource)), source.Expression, accumulator));
+#endif
+        }
+
         public static IAsyncQueryable<TSource> Scan<TSource>(this IAsyncQueryable<TSource> source, Expression<Func<TSource, TSource, TSource>> accumulator)
         {
             if (source == null)
@@ -882,6 +1078,20 @@ namespace System.Linq
 #endif
         }
 
+        public static IAsyncQueryable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncQueryable<TSource> source, TAccumulate seed, Expression<Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>>> accumulator)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (accumulator == null)
+                throw new ArgumentNullException(nameof(accumulator));
+
+#if CRIPPLED_REFLECTION
+            return source.Provider.CreateQuery<TAccumulate>(Expression.Call(InfoOf(() => AsyncQueryableEx.Scan<TSource, TAccumulate>(default(IAsyncQueryable<TSource>), default(TAccumulate), default(Expression<Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>>>))), source.Expression, Expression.Constant(seed, typeof(TAccumulate)), accumulator));
+#else
+            return source.Provider.CreateQuery<TAccumulate>(Expression.Call(((MethodInfo)MethodBase.GetCurrentMethod()).MakeGenericMethod(typeof(TSource), typeof(TAccumulate)), source.Expression, Expression.Constant(seed, typeof(TAccumulate)), accumulator));
+#endif
+        }
+
         public static IAsyncQueryable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncQueryable<TSource> source, TAccumulate seed, Expression<Func<TAccumulate, TSource, TAccumulate>> accumulator)
         {
             if (source == null)

+ 1 - 1
Ix.NET/Source/System.Interactive.Async/System.Interactive.Async.csproj

@@ -2,7 +2,7 @@
 
   <PropertyGroup>
     <Description>Interactive Extensions Async Library used to express queries over asynchronous enumerable sequences.</Description>
-    <AssemblyTitle>Interactive Extensions - Async Library</AssemblyTitle>    
+    <AssemblyTitle>Interactive Extensions - Async Library</AssemblyTitle>
     <TargetFrameworks>net45;net46;netstandard1.4;netstandard2.0</TargetFrameworks>
     <PackageTags>Ix;Interactive;Extensions;Enumerable;Asynchronous</PackageTags>
   </PropertyGroup>

+ 108 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Catch.cs

@@ -34,6 +34,19 @@ namespace System.Linq
             return new CatchAsyncIteratorWithTask<TSource, TException>(source, handler);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> Catch<TSource, TException>(this IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler)
+            where TException : Exception
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (handler == null)
+                throw Error.ArgumentNull(nameof(handler));
+
+            return new CatchAsyncIteratorWithTaskAndCancellation<TSource, TException>(source, handler);
+        }
+#endif
+
         public static IAsyncEnumerable<TSource> Catch<TSource>(this IEnumerable<IAsyncEnumerable<TSource>> sources)
         {
             if (sources == null)
@@ -251,6 +264,101 @@ namespace System.Linq
             }
         }
 
+#if !NO_DEEP_CANCELLATION
+        private sealed class CatchAsyncIteratorWithTaskAndCancellation<TSource, TException> : AsyncIterator<TSource> where TException : Exception
+        {
+            private readonly Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> _handler;
+            private readonly IAsyncEnumerable<TSource> _source;
+
+            private IAsyncEnumerator<TSource> _enumerator;
+            private bool _isDone;
+
+            public CatchAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TException, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> handler)
+            {
+                Debug.Assert(source != null);
+                Debug.Assert(handler != null);
+
+                _source = source;
+                _handler = handler;
+            }
+
+            public override AsyncIteratorBase<TSource> Clone()
+            {
+                return new CatchAsyncIteratorWithTaskAndCancellation<TSource, TException>(_source, _handler);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
+                        _isDone = false;
+
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        while (true)
+                        {
+                            if (!_isDone)
+                            {
+                                try
+                                {
+                                    if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                                    {
+                                        _current = _enumerator.Current;
+                                        return true;
+                                    }
+                                }
+                                catch (TException ex)
+                                {
+                                    // Note: Ideally we'd dipose of the previous enumerator before
+                                    // invoking the handler, but we use this order to preserve
+                                    // current behavior
+                                    var inner = await _handler(ex, _cancellationToken).ConfigureAwait(false);
+                                    var err = inner.GetAsyncEnumerator(_cancellationToken);
+
+                                    if (_enumerator != null)
+                                    {
+                                        await _enumerator.DisposeAsync().ConfigureAwait(false);
+                                    }
+
+                                    _enumerator = err;
+                                    _isDone = true;
+                                    continue; // loop so we hit the catch state
+                                }
+                            }
+
+                            if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                            {
+                                _current = _enumerator.Current;
+                                return true;
+                            }
+
+                            break; // while
+                        }
+
+                        break; // case
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
+#endif
+
         private sealed class CatchAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly IEnumerable<IAsyncEnumerable<TSource>> _sources;

+ 68 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Defer.cs

@@ -6,7 +6,6 @@ using System.Collections.Generic;
 using System.Diagnostics;
 using System.Threading;
 using System.Threading.Tasks;
-using static System.Linq.AsyncEnumerable;
 
 namespace System.Linq
 {
@@ -28,6 +27,16 @@ namespace System.Linq
             return new AsyncDeferIterator<TSource>(factory);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> Defer<TSource>(Func<CancellationToken, Task<IAsyncEnumerable<TSource>>> factory)
+        {
+            if (factory == null)
+                throw Error.ArgumentNull(nameof(factory));
+
+            return new AsyncDeferIteratorWithCancellation<TSource>(factory);
+        }
+#endif
+
         private sealed class DeferIterator<T> : AsyncIteratorBase<T>
         {
             private readonly Func<IAsyncEnumerable<T>> _factory;
@@ -141,5 +150,63 @@ namespace System.Linq
                 return await _enumerator.MoveNextAsync().ConfigureAwait(false);
             }
         }
+
+#if !NO_DEEP_CANCELLATION
+        private sealed class AsyncDeferIteratorWithCancellation<T> : AsyncIteratorBase<T>
+        {
+            private readonly Func<CancellationToken, Task<IAsyncEnumerable<T>>> _factory;
+            private IAsyncEnumerator<T> _enumerator;
+
+            public AsyncDeferIteratorWithCancellation(Func<CancellationToken, Task<IAsyncEnumerable<T>>> factory)
+            {
+                Debug.Assert(factory != null);
+
+                _factory = factory;
+            }
+
+            public override T Current => _enumerator == null ? default : _enumerator.Current;
+
+            public override AsyncIteratorBase<T> Clone()
+            {
+                return new AsyncDeferIteratorWithCancellation<T>(_factory);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override ValueTask<bool> MoveNextCore()
+            {
+                if (_enumerator == null)
+                {
+                    return InitializeAndMoveNextAsync();
+                }
+
+                return _enumerator.MoveNextAsync();
+            }
+
+            private async ValueTask<bool> InitializeAndMoveNextAsync()
+            {
+                try
+                {
+                    _enumerator = (await _factory(_cancellationToken).ConfigureAwait(false)).GetAsyncEnumerator(_cancellationToken);
+                }
+                catch (Exception ex)
+                {
+                    _enumerator = Throw<T>(ex).GetAsyncEnumerator(_cancellationToken);
+                    throw;
+                }
+
+                return await _enumerator.MoveNextAsync().ConfigureAwait(false);
+            }
+        }
+#endif
     }
 }

+ 6 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Distinct.cs

@@ -41,6 +41,9 @@ namespace System.Linq
             return DistinctCore<TSource, TKey>(source, keySelector, comparer: null);
         }
 
+#if !NO_DEEP_CANCELLATION // TODO
+#endif
+
         public static IAsyncEnumerable<TSource> Distinct<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
         {
             if (source == null)
@@ -53,6 +56,9 @@ namespace System.Linq
             return DistinctCore(source, keySelector, comparer);
         }
 
+#if !NO_DEEP_CANCELLATION // TODO
+#endif
+
         private static IAsyncEnumerable<TSource> DistinctCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
         {
             return new DistinctAsyncIterator<TSource, TKey>(source, keySelector, comparer);

+ 106 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/DistinctUntilChanged.cs

@@ -57,6 +57,18 @@ namespace System.Linq
             return DistinctUntilChangedCore<TSource, TKey>(source, keySelector, comparer: null);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (keySelector == null)
+                throw Error.ArgumentNull(nameof(keySelector));
+
+            return DistinctUntilChangedCore<TSource, TKey>(source, keySelector, comparer: null);
+        }
+#endif
+
         public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
         {
             if (source == null)
@@ -69,6 +81,20 @@ namespace System.Linq
             return DistinctUntilChangedCore(source, keySelector, comparer);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> DistinctUntilChanged<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (keySelector == null)
+                throw Error.ArgumentNull(nameof(keySelector));
+            if (comparer == null)
+                throw Error.ArgumentNull(nameof(comparer));
+
+            return DistinctUntilChangedCore(source, keySelector, comparer);
+        }
+#endif
+
         private static IAsyncEnumerable<TSource> DistinctUntilChangedCore<TSource>(IAsyncEnumerable<TSource> source, IEqualityComparer<TSource> comparer)
         {
             return new DistinctUntilChangedAsyncIterator<TSource>(source, comparer);
@@ -84,6 +110,13 @@ namespace System.Linq
             return new DistinctUntilChangedAsyncIteratorWithTask<TSource, TKey>(source, keySelector, comparer);
         }
 
+#if !NO_DEEP_CANCELLATION
+        private static IAsyncEnumerable<TSource> DistinctUntilChangedCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
+        {
+            return new DistinctUntilChangedAsyncIteratorWithTaskAndCancellation<TSource, TKey>(source, keySelector, comparer);
+        }
+#endif
+
         private sealed class DistinctUntilChangedAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly IEqualityComparer<TSource> _comparer;
@@ -296,5 +329,78 @@ namespace System.Linq
                 return false;
             }
         }
+
+#if !NO_DEEP_CANCELLATION
+        private sealed class DistinctUntilChangedAsyncIteratorWithTaskAndCancellation<TSource, TKey> : AsyncIterator<TSource>
+        {
+            private readonly IEqualityComparer<TKey> _comparer;
+            private readonly Func<TSource, CancellationToken, ValueTask<TKey>> _keySelector;
+            private readonly IAsyncEnumerable<TSource> _source;
+            private TKey _currentKeyValue;
+
+            private IAsyncEnumerator<TSource> _enumerator;
+            private bool _hasCurrentKey;
+
+            public DistinctUntilChangedAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IEqualityComparer<TKey> comparer)
+            {
+                _source = source;
+                _keySelector = keySelector;
+                _comparer = comparer ?? EqualityComparer<TKey>.Default;
+            }
+
+            public override AsyncIteratorBase<TSource> Clone()
+            {
+                return new DistinctUntilChangedAsyncIteratorWithTaskAndCancellation<TSource, TKey>(_source, _keySelector, _comparer);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                    _currentKeyValue = default;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                        {
+                            var item = _enumerator.Current;
+                            var key = await _keySelector(item, _cancellationToken).ConfigureAwait(false);
+                            var comparerEquals = false;
+
+                            if (_hasCurrentKey)
+                            {
+                                comparerEquals = _comparer.Equals(_currentKeyValue, key);
+                            }
+                            if (!_hasCurrentKey || !comparerEquals)
+                            {
+                                _hasCurrentKey = true;
+                                _currentKeyValue = key;
+                                _current = item;
+                                return true;
+                            }
+                        }
+
+                        break; // case
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
+#endif
     }
 }

+ 140 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Do.cs

@@ -11,6 +11,8 @@ namespace System.Linq
 {
     public static partial class AsyncEnumerableEx
     {
+        // REVIEW: Should we convert Task-based overloads to ValueTask?
+
         public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> onNext)
         {
             if (source == null)
@@ -107,6 +109,56 @@ namespace System.Linq
             return DoCore(source, onNext, onError, onCompleted);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (onNext == null)
+                throw Error.ArgumentNull(nameof(onNext));
+
+            return DoCore(source, onNext: onNext, onError: null, onCompleted: null);
+        }
+
+        public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<CancellationToken, Task> onCompleted)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (onNext == null)
+                throw Error.ArgumentNull(nameof(onNext));
+            if (onCompleted == null)
+                throw Error.ArgumentNull(nameof(onCompleted));
+
+            return DoCore(source, onNext: onNext, onError: null, onCompleted: onCompleted);
+        }
+
+        public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (onNext == null)
+                throw Error.ArgumentNull(nameof(onNext));
+            if (onError == null)
+                throw Error.ArgumentNull(nameof(onError));
+
+            return DoCore(source, onNext: onNext, onError: onError, onCompleted: null);
+        }
+
+        public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError, Func<CancellationToken, Task> onCompleted)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (onNext == null)
+                throw Error.ArgumentNull(nameof(onNext));
+            if (onError == null)
+                throw Error.ArgumentNull(nameof(onError));
+            if (onCompleted == null)
+                throw Error.ArgumentNull(nameof(onCompleted));
+
+            return DoCore(source, onNext, onError, onCompleted);
+        }
+#endif
+
         public static IAsyncEnumerable<TSource> Do<TSource>(this IAsyncEnumerable<TSource> source, IObserver<TSource> observer)
         {
             if (source == null)
@@ -127,6 +179,13 @@ namespace System.Linq
             return new DoAsyncIteratorWithTask<TSource>(source, onNext, onError, onCompleted);
         }
 
+#if !NO_DEEP_CANCELLATION
+        private static IAsyncEnumerable<TSource> DoCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError, Func<CancellationToken, Task> onCompleted)
+        {
+            return new DoAsyncIteratorWithTaskAndCancellation<TSource>(source, onNext, onError, onCompleted);
+        }
+#endif
+
         private sealed class DoAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly Action _onCompleted;
@@ -281,5 +340,86 @@ namespace System.Linq
                 return false;
             }
         }
+
+#if !NO_DEEP_CANCELLATION
+        private sealed class DoAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
+        {
+            private readonly Func<CancellationToken, Task> _onCompleted;
+            private readonly Func<Exception, CancellationToken, Task> _onError;
+            private readonly Func<TSource, CancellationToken, Task> _onNext;
+            private readonly IAsyncEnumerable<TSource> _source;
+
+            private IAsyncEnumerator<TSource> _enumerator;
+
+            public DoAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> onNext, Func<Exception, CancellationToken, Task> onError, Func<CancellationToken, Task> onCompleted)
+            {
+                Debug.Assert(source != null);
+                Debug.Assert(onNext != null);
+
+                _source = source;
+                _onNext = onNext;
+                _onError = onError;
+                _onCompleted = onCompleted;
+            }
+
+            public override AsyncIteratorBase<TSource> Clone()
+            {
+                return new DoAsyncIteratorWithTaskAndCancellation<TSource>(_source, _onNext, _onError, _onCompleted);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        try
+                        {
+                            if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                            {
+                                _current = _enumerator.Current;
+                                await _onNext(_current, _cancellationToken).ConfigureAwait(false);
+
+                                return true;
+                            }
+                        }
+                        catch (OperationCanceledException)
+                        {
+                            throw;
+                        }
+                        catch (Exception ex) when (_onError != null)
+                        {
+                            await _onError(ex, _cancellationToken).ConfigureAwait(false);
+                            throw;
+                        }
+
+                        if (_onCompleted != null)
+                        {
+                            await _onCompleted(_cancellationToken).ConfigureAwait(false);
+                        }
+
+                        await DisposeAsync().ConfigureAwait(false);
+                        break;
+                }
+
+                return false;
+            }
+        }
+#endif
     }
 }

+ 104 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Expand.cs

@@ -31,6 +31,18 @@ namespace System.Linq
             return new ExpandAsyncIteratorWithTask<TSource>(source, selector);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> Expand<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> selector)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (selector == null)
+                throw Error.ArgumentNull(nameof(selector));
+
+            return new ExpandAsyncIteratorWithTaskAndCancellation<TSource>(source, selector);
+        }
+#endif
+
         private sealed class ExpandAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly Func<TSource, IAsyncEnumerable<TSource>> _selector;
@@ -210,5 +222,97 @@ namespace System.Linq
                 return false;
             }
         }
+
+#if !NO_DEEP_CANCELLATION
+        private sealed class ExpandAsyncIteratorWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
+        {
+            private readonly Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> _selector;
+            private readonly IAsyncEnumerable<TSource> _source;
+
+            private IAsyncEnumerator<TSource> _enumerator;
+
+            private Queue<IAsyncEnumerable<TSource>> _queue;
+
+            public ExpandAsyncIteratorWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> selector)
+            {
+                Debug.Assert(source != null);
+                Debug.Assert(selector != null);
+
+                _source = source;
+                _selector = selector;
+            }
+
+            public override AsyncIteratorBase<TSource> Clone()
+            {
+                return new ExpandAsyncIteratorWithTaskAndCancellation<TSource>(_source, _selector);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                }
+
+                _queue = null;
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _queue = new Queue<IAsyncEnumerable<TSource>>();
+                        _queue.Enqueue(_source);
+
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        while (true)
+                        {
+                            if (_enumerator == null)
+                            {
+                                if (_queue.Count > 0)
+                                {
+                                    var src = _queue.Dequeue();
+
+                                    if (_enumerator != null)
+                                    {
+                                        await _enumerator.DisposeAsync().ConfigureAwait(false);
+                                    }
+
+                                    _enumerator = src.GetAsyncEnumerator(_cancellationToken);
+
+                                    continue; // loop
+                                }
+
+                                break; // while
+                            }
+
+                            if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                            {
+                                var item = _enumerator.Current;
+                                var next = await _selector(item, _cancellationToken).ConfigureAwait(false);
+                                _queue.Enqueue(next);
+                                _current = item;
+                                return true;
+                            }
+
+                            await _enumerator.DisposeAsync().ConfigureAwait(false);
+                            _enumerator = null;
+                        }
+
+                        break; // case
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
+#endif
     }
 }

+ 2 - 1
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Finally.cs

@@ -4,7 +4,6 @@
 
 using System.Collections.Generic;
 using System.Diagnostics;
-using System.Threading;
 using System.Threading.Tasks;
 
 namespace System.Linq
@@ -31,6 +30,8 @@ namespace System.Linq
             return new FinallyAsyncIteratorWithTask<TSource>(source, finallyAction);
         }
 
+        // REVIEW: No cancellation support for finally action.
+
         private sealed class FinallyAsyncIterator<TSource> : AsyncIterator<TSource>
         {
             private readonly Action _finallyAction;

+ 2 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Generate.cs

@@ -23,6 +23,8 @@ namespace System.Linq
             return new GenerateAsyncIterator<TState, TResult>(initialState, condition, iterate, resultSelector);
         }
 
+        // REVIEW: Add async variant?
+
         private sealed class GenerateAsyncIterator<TState, TResult> : AsyncIterator<TResult>
         {
             private readonly Func<TState, bool> _condition;

+ 36 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/MaxBy.cs

@@ -70,6 +70,18 @@ namespace System.Linq
             return MaxByCore<TSource, TKey>(source, keySelector, comparer: null, cancellationToken);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (keySelector == null)
+                throw Error.ArgumentNull(nameof(keySelector));
+
+            return MaxByCore<TSource, TKey>(source, keySelector, comparer: null, cancellationToken);
+        }
+#endif
+
         public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IComparer<TKey> comparer)
         {
             if (source == null)
@@ -90,6 +102,18 @@ namespace System.Linq
             return MaxByCore(source, keySelector, comparer, cancellationToken);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static Task<IList<TSource>> MaxByAsync<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (keySelector == null)
+                throw Error.ArgumentNull(nameof(keySelector));
+
+            return MaxByCore(source, keySelector, comparer, cancellationToken);
+        }
+#endif
+
         private static Task<IList<TSource>> MaxByCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
         {
             if (comparer == null)
@@ -109,5 +133,17 @@ namespace System.Linq
 
             return ExtremaBy(source, keySelector, (key, minValue) => comparer.Compare(key, minValue), cancellationToken);
         }
+
+#if !NO_DEEP_CANCELLATION
+        private static Task<IList<TSource>> MaxByCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
+        {
+            if (comparer == null)
+            {
+                comparer = Comparer<TKey>.Default;
+            }
+
+            return ExtremaBy(source, keySelector, (key, minValue) => comparer.Compare(key, minValue), cancellationToken);
+        }
+#endif
     }
 }

+ 78 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/MinBy.cs

@@ -70,6 +70,18 @@ namespace System.Linq
             return MinByCore<TSource, TKey>(source, keySelector, comparer: null, cancellationToken);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (keySelector == null)
+                throw Error.ArgumentNull(nameof(keySelector));
+
+            return MinByCore<TSource, TKey>(source, keySelector, comparer: null, cancellationToken);
+        }
+#endif
+
         public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, ValueTask<TKey>> keySelector, IComparer<TKey> comparer)
         {
             if (source == null)
@@ -90,6 +102,18 @@ namespace System.Linq
             return MinByCore(source, keySelector, comparer, cancellationToken);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static Task<IList<TSource>> MinByAsync<TSource, TKey>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (keySelector == null)
+                throw Error.ArgumentNull(nameof(keySelector));
+
+            return MinByCore(source, keySelector, comparer, cancellationToken);
+        }
+#endif
+
         private static Task<IList<TSource>> MinByCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
         {
             if (comparer == null)
@@ -110,6 +134,18 @@ namespace System.Linq
             return ExtremaBy(source, keySelector, (key, minValue) => -comparer.Compare(key, minValue), cancellationToken);
         }
 
+#if !NO_DEEP_CANCELLATION
+        private static Task<IList<TSource>> MinByCore<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, IComparer<TKey> comparer, CancellationToken cancellationToken)
+        {
+            if (comparer == null)
+            {
+                comparer = Comparer<TKey>.Default;
+            }
+
+            return ExtremaBy(source, keySelector, (key, minValue) => -comparer.Compare(key, minValue), cancellationToken);
+        }
+#endif
+
         private static async Task<IList<TSource>> ExtremaBy<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, TKey> keySelector, Func<TKey, TKey, int> compare, CancellationToken cancellationToken)
         {
             var result = new List<TSource>();
@@ -189,5 +225,47 @@ namespace System.Linq
 
             return result;
         }
+
+#if !NO_DEEP_CANCELLATION
+        private static async Task<IList<TSource>> ExtremaBy<TSource, TKey>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, ValueTask<TKey>> keySelector, Func<TKey, TKey, int> compare, CancellationToken cancellationToken)
+        {
+            var result = new List<TSource>();
+
+            var e = source.GetAsyncEnumerator(cancellationToken);
+
+            try
+            {
+                if (!await e.MoveNextAsync().ConfigureAwait(false))
+                    throw Error.NoElements();
+
+                var current = e.Current;
+                var resKey = await keySelector(current, cancellationToken).ConfigureAwait(false);
+                result.Add(current);
+
+                while (await e.MoveNextAsync().ConfigureAwait(false))
+                {
+                    var cur = e.Current;
+                    var key = await keySelector(cur, cancellationToken).ConfigureAwait(false);
+
+                    var cmp = compare(key, resKey);
+                    if (cmp == 0)
+                    {
+                        result.Add(cur);
+                    }
+                    else if (cmp > 0)
+                    {
+                        result = new List<TSource> { cur };
+                        resKey = key;
+                    }
+                }
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+
+            return result;
+        }
+#endif
     }
 }

+ 167 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Scan.cs

@@ -41,6 +41,18 @@ namespace System.Linq
             return new ScanAsyncEnumerableWithTask<TSource>(source, accumulator);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> Scan<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (accumulator == null)
+                throw Error.ArgumentNull(nameof(accumulator));
+
+            return new ScanAsyncEnumerableWithTaskAndCancellation<TSource>(source, accumulator);
+        }
+#endif
+
         public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, ValueTask<TAccumulate>> accumulator)
         {
             if (source == null)
@@ -51,6 +63,18 @@ namespace System.Linq
             return new ScanAsyncEnumerableWithTask<TSource, TAccumulate>(source, seed, accumulator);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TAccumulate> Scan<TSource, TAccumulate>(this IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator)
+        {
+            if (source == null)
+                throw Error.ArgumentNull(nameof(source));
+            if (accumulator == null)
+                throw Error.ArgumentNull(nameof(accumulator));
+
+            return new ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate>(source, seed, accumulator);
+        }
+#endif
+
         private sealed class ScanAsyncEnumerable<TSource> : AsyncIterator<TSource>
         {
             private readonly Func<TSource, TSource, TSource> _accumulator;
@@ -263,6 +287,82 @@ namespace System.Linq
             }
         }
 
+#if !NO_DEEP_CANCELLATION
+        private sealed class ScanAsyncEnumerableWithTaskAndCancellation<TSource> : AsyncIterator<TSource>
+        {
+            private readonly Func<TSource, TSource, CancellationToken, ValueTask<TSource>> _accumulator;
+            private readonly IAsyncEnumerable<TSource> _source;
+
+            private TSource _accumulated;
+            private IAsyncEnumerator<TSource> _enumerator;
+
+            private bool _hasSeed;
+
+            public ScanAsyncEnumerableWithTaskAndCancellation(IAsyncEnumerable<TSource> source, Func<TSource, TSource, CancellationToken, ValueTask<TSource>> accumulator)
+            {
+                Debug.Assert(source != null);
+                Debug.Assert(accumulator != null);
+
+                _source = source;
+                _accumulator = accumulator;
+            }
+
+            public override AsyncIteratorBase<TSource> Clone()
+            {
+                return new ScanAsyncEnumerableWithTaskAndCancellation<TSource>(_source, _accumulator);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                    _accumulated = default;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
+                        _hasSeed = false;
+                        _accumulated = default;
+
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+
+                        while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                        {
+                            var item = _enumerator.Current;
+                            if (!_hasSeed)
+                            {
+                                _hasSeed = true;
+                                _accumulated = item;
+                                continue; // loop
+                            }
+
+                            _accumulated = await _accumulator(_accumulated, item, _cancellationToken).ConfigureAwait(false);
+                            _current = _accumulated;
+                            return true;
+                        }
+
+                        break; // case
+
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
+#endif
+
         private sealed class ScanAsyncEnumerableWithTask<TSource, TAccumulate> : AsyncIterator<TAccumulate>
         {
             private readonly Func<TAccumulate, TSource, ValueTask<TAccumulate>> _accumulator;
@@ -327,5 +427,72 @@ namespace System.Linq
                 return false;
             }
         }
+
+#if !NO_DEEP_CANCELLATION
+        private sealed class ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate> : AsyncIterator<TAccumulate>
+        {
+            private readonly Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> _accumulator;
+            private readonly TAccumulate _seed;
+            private readonly IAsyncEnumerable<TSource> _source;
+
+            private TAccumulate _accumulated;
+            private IAsyncEnumerator<TSource> _enumerator;
+
+            public ScanAsyncEnumerableWithTaskAndCancellation(IAsyncEnumerable<TSource> source, TAccumulate seed, Func<TAccumulate, TSource, CancellationToken, ValueTask<TAccumulate>> accumulator)
+            {
+                Debug.Assert(source != null);
+                Debug.Assert(accumulator != null);
+
+                _source = source;
+                _seed = seed;
+                _accumulator = accumulator;
+            }
+
+            public override AsyncIteratorBase<TAccumulate> Clone()
+            {
+                return new ScanAsyncEnumerableWithTaskAndCancellation<TSource, TAccumulate>(_source, _seed, _accumulator);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                    _accumulated = default;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _enumerator = _source.GetAsyncEnumerator(_cancellationToken);
+                        _accumulated = _seed;
+
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        if (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                        {
+                            var item = _enumerator.Current;
+                            _accumulated = await _accumulator(_accumulated, item, _cancellationToken).ConfigureAwait(false);
+                            _current = _accumulated;
+                            return true;
+                        }
+
+                        break;
+
+                }
+
+                await DisposeAsync().ConfigureAwait(false);
+                return false;
+            }
+        }
+#endif
     }
 }

+ 83 - 0
Ix.NET/Source/System.Interactive.Async/System/Linq/Operators/Using.cs

@@ -11,6 +11,8 @@ namespace System.Linq
 {
     public static partial class AsyncEnumerableEx
     {
+        // REVIEW: Add support for IAsyncDisposable resources.
+
         public static IAsyncEnumerable<TSource> Using<TSource, TResource>(Func<TResource> resourceFactory, Func<TResource, IAsyncEnumerable<TSource>> enumerableFactory) where TResource : IDisposable
         {
             if (resourceFactory == null)
@@ -31,6 +33,18 @@ namespace System.Linq
             return new UsingAsyncIteratorWithTask<TSource, TResource>(resourceFactory, enumerableFactory);
         }
 
+#if !NO_DEEP_CANCELLATION
+        public static IAsyncEnumerable<TSource> Using<TSource, TResource>(Func<CancellationToken, Task<TResource>> resourceFactory, Func<TResource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> enumerableFactory) where TResource : IDisposable
+        {
+            if (resourceFactory == null)
+                throw Error.ArgumentNull(nameof(resourceFactory));
+            if (enumerableFactory == null)
+                throw Error.ArgumentNull(nameof(enumerableFactory));
+
+            return new UsingAsyncIteratorWithTaskAndCancellation<TSource, TResource>(resourceFactory, enumerableFactory);
+        }
+#endif
+
         private sealed class UsingAsyncIterator<TSource, TResource> : AsyncIterator<TSource> where TResource : IDisposable
         {
             private readonly Func<TResource, IAsyncEnumerable<TSource>> _enumerableFactory;
@@ -165,5 +179,74 @@ namespace System.Linq
                 return false;
             }
         }
+
+#if !NO_DEEP_CANCELLATION
+        private sealed class UsingAsyncIteratorWithTaskAndCancellation<TSource, TResource> : AsyncIterator<TSource> where TResource : IDisposable
+        {
+            private readonly Func<TResource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> _enumerableFactory;
+            private readonly Func<CancellationToken, Task<TResource>> _resourceFactory;
+
+            private IAsyncEnumerable<TSource> _enumerable;
+            private IAsyncEnumerator<TSource> _enumerator;
+            private TResource _resource;
+
+            public UsingAsyncIteratorWithTaskAndCancellation(Func<CancellationToken, Task<TResource>> resourceFactory, Func<TResource, CancellationToken, ValueTask<IAsyncEnumerable<TSource>>> enumerableFactory)
+            {
+                Debug.Assert(resourceFactory != null);
+                Debug.Assert(enumerableFactory != null);
+
+                _resourceFactory = resourceFactory;
+                _enumerableFactory = enumerableFactory;
+            }
+
+            public override AsyncIteratorBase<TSource> Clone()
+            {
+                return new UsingAsyncIteratorWithTaskAndCancellation<TSource, TResource>(_resourceFactory, _enumerableFactory);
+            }
+
+            public override async ValueTask DisposeAsync()
+            {
+                if (_enumerator != null)
+                {
+                    await _enumerator.DisposeAsync().ConfigureAwait(false);
+                    _enumerator = null;
+                }
+
+                if (_resource != null)
+                {
+                    _resource.Dispose();
+                    _resource = default;
+                }
+
+                await base.DisposeAsync().ConfigureAwait(false);
+            }
+
+            protected override async ValueTask<bool> MoveNextCore()
+            {
+                switch (_state)
+                {
+                    case AsyncIteratorState.Allocated:
+                        _resource = await _resourceFactory(_cancellationToken).ConfigureAwait(false);
+                        _enumerable = await _enumerableFactory(_resource, _cancellationToken).ConfigureAwait(false);
+
+                        _enumerator = _enumerable.GetAsyncEnumerator(_cancellationToken);
+                        _state = AsyncIteratorState.Iterating;
+                        goto case AsyncIteratorState.Iterating;
+
+                    case AsyncIteratorState.Iterating:
+                        while (await _enumerator.MoveNextAsync().ConfigureAwait(false))
+                        {
+                            _current = _enumerator.Current;
+                            return true;
+                        }
+
+                        await DisposeAsync().ConfigureAwait(false);
+                        break;
+                }
+
+                return false;
+            }
+        }
+#endif
     }
 }

+ 0 - 1
Ix.NET/Source/refs/Directory.build.props

@@ -5,7 +5,6 @@
   <PropertyGroup>
     <!-- This is here so we can create a fake .NET Standard 2.1 facade -->
     <NETStandardMaximumVersion>2.1</NETStandardMaximumVersion>
-
     <DefineConstants>$(DefineConstants);REFERENCE_ASSEMBLY</DefineConstants>
   </PropertyGroup>
 

+ 19 - 0
Ix.NET/Source/refs/System.Interactive.Async.Providers.Ref/System.Interactive.Async.Providers.Ref.csproj

@@ -0,0 +1,19 @@
+<Project Sdk="MSBuild.Sdk.Extras">
+
+  <PropertyGroup>
+    <Description>Interactive Extensions Async Providers Library used to build query providers and express queries over enumerable sequences.</Description>
+    <AssemblyTitle>Interactive Extensions - Async Providers Library</AssemblyTitle>
+    <TargetFrameworks>netcoreapp2.0;netstandard2.1</TargetFrameworks>
+    <PackageTags>Ix;Interactive;Extensions;Enumerable;Asynchronous</PackageTags>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <Compile Include="..\..\System.Interactive.Async.Providers\**\*.cs" Exclude="..\..\System.Interactive.Async.Providers\obj\**" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\System.Interactive.Async.Ref\System.Interactive.Async.Ref.csproj" />
+    <ProjectReference Include="..\System.Linq.Async.Queryable.Ref\System.Linq.Async.Queryable.Ref.csproj" />
+  </ItemGroup>
+
+</Project>

+ 23 - 0
Ix.NET/Source/refs/System.Interactive.Async.Ref/System.Interactive.Async.Ref.csproj

@@ -0,0 +1,23 @@
+<Project Sdk="MSBuild.Sdk.Extras">
+
+  <PropertyGroup>
+    <Description>Interactive Extensions Async Library used to express queries over asynchronous enumerable sequences.</Description>
+    <AssemblyTitle>Interactive Extensions - Async Library</AssemblyTitle>
+    <Authors>Microsoft</Authors>
+    <TargetFrameworks>netcoreapp2.0;netstandard2.1</TargetFrameworks>
+    <PackageTags>Ix;Interactive;Extensions;Enumerable;Asynchronous</PackageTags>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <Compile Include="..\..\System.Interactive.Async\**\*.cs" Exclude="..\..\System.Interactive.Async\obj\**" />
+    <Compile Include="..\..\System.Linq.Async\System\Error.cs" Link="System\Error.cs" />
+    <Compile Include="..\..\System.Linq.Async\System\Linq\AsyncIterator.cs" Link="System\Linq\AsyncIterator.cs" />
+    <Compile Include="..\..\System.Linq.Async\System\Linq\Set.cs" Link="System\Linq\Set.cs" />
+    <Compile Include="..\..\System.Linq.Async\System\Strings.cs" Link="System\Strings.cs" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\System.Linq.Async.Ref\System.Linq.Async.Ref.csproj" />
+  </ItemGroup>
+  
+</Project>

+ 23 - 0
Ix.NET/Source/refs/System.Linq.Async.Queryable.Ref/System.Linq.Async.Queryable.Ref.csproj

@@ -0,0 +1,23 @@
+<Project Sdk="MSBuild.Sdk.Extras">
+
+  <PropertyGroup>
+    <Description>LINQ Standard Query Operators used to express queries over asynchronous enumerable sequences.</Description>
+    <AssemblyTitle>System.Linq.Async.Queryable</AssemblyTitle>
+    <Authors>Microsoft</Authors>
+    <TargetFrameworks>netstandard1.0;netcoreapp2.0;netstandard2.1</TargetFrameworks>
+    <PackageTags>Enumerable;Asynchronous;LINQ</PackageTags>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.1" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\System.Linq.Async.Ref\System.Linq.Async.Ref.csproj" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <Compile Include="..\..\System.Linq.Async.Queryable\**\*.cs" Exclude="..\..\System.Linq.Async.Queryable\obj\**" />
+  </ItemGroup>
+  
+</Project>

+ 19 - 0
Ix.NET/Source/refs/System.Linq.Async.Ref/System.Linq.Async.Ref.csproj

@@ -0,0 +1,19 @@
+<Project Sdk="MSBuild.Sdk.Extras">
+
+  <PropertyGroup>
+    <Description>LINQ Standard Query Operators used to express queries over asynchronous enumerable sequences.</Description>
+    <AssemblyTitle>System.Linq.Async</AssemblyTitle>
+    <Authors>Microsoft</Authors>
+    <TargetFrameworks>netstandard1.0;netcoreapp2.0;netstandard2.1</TargetFrameworks>
+    <PackageTags>Enumerable;Asynchronous;LINQ</PackageTags>
+  </PropertyGroup>
+
+  <ItemGroup>
+    <PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.1" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <Compile Include="..\..\System.Linq.Async\**\*.cs" Exclude="..\..\System.Linq.Async\obj\**" />
+  </ItemGroup>
+  
+</Project>