Explorar el Código

Clean up ForEachAsync stuff.

Bart De Smet hace 7 años
padre
commit
751176ba4e
Se han modificado 1 ficheros con 47 adiciones y 53 borrados
  1. 47 53
      Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ForEach.cs

+ 47 - 53
Ix.NET/Source/System.Linq.Async/System/Linq/Operators/ForEach.cs

@@ -13,59 +13,39 @@ namespace System.Linq
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> action)
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> action)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return ForEachAsync(source, action, CancellationToken.None);
+            return ForEachAsyncCore(source, action, CancellationToken.None);
         }
         }
 
 
-        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource, int> action)
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return ForEachAsync(source, action, CancellationToken.None);
+            return ForEachAsyncCore(source, action, cancellationToken);
         }
         }
 
 
-        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource, int> action)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return source.ForEachAsync((x, i) => action(x), cancellationToken);
+            return ForEachAsyncCore(source, action, CancellationToken.None);
         }
         }
 
 
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource, int> action, CancellationToken cancellationToken)
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Action<TSource, int> action, CancellationToken cancellationToken)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
             return ForEachAsyncCore(source, action, cancellationToken);
             return ForEachAsyncCore(source, action, cancellationToken);
         }
         }
@@ -73,61 +53,41 @@ namespace System.Linq
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> action)
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> action)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return ForEachAsync(source, action, CancellationToken.None);
+            return ForEachAsyncCore(source, (x, ct) => action(x), CancellationToken.None);
         }
         }
 
 
-        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, Task> action)
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> action, CancellationToken cancellationToken)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return ForEachAsync(source, action, CancellationToken.None);
+            return ForEachAsyncCore(source, (x, ct) => action(x), cancellationToken);
         }
         }
 
 
-        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> action, CancellationToken cancellationToken)
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> action, CancellationToken cancellationToken)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return source.ForEachAsync((x, i, ct) => action(x), cancellationToken);
+            return ForEachAsyncCore(source, action, cancellationToken);
         }
         }
 
 
-        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> action, CancellationToken cancellationToken)
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, Task> action)
         {
         {
             if (source == null)
             if (source == null)
-            {
                 throw new ArgumentNullException(nameof(source));
                 throw new ArgumentNullException(nameof(source));
-            }
-
             if (action == null)
             if (action == null)
-            {
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
-            }
 
 
-            return source.ForEachAsync((x, i, ct) => action(x, ct), cancellationToken);
+            return ForEachAsyncCore(source, (x, i, ct) => action(x, i), CancellationToken.None);
         }
         }
 
 
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, Task> action, CancellationToken cancellationToken)
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, Task> action, CancellationToken cancellationToken)
@@ -137,7 +97,7 @@ namespace System.Linq
             if (action == null)
             if (action == null)
                 throw new ArgumentNullException(nameof(action));
                 throw new ArgumentNullException(nameof(action));
 
 
-            return source.ForEachAsync((x, i, ct) => action(x, i), cancellationToken);
+            return ForEachAsyncCore(source, (x, i, ct) => action(x, i), cancellationToken);
         }
         }
 
 
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, Task> action, CancellationToken cancellationToken)
         public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, Task> action, CancellationToken cancellationToken)
@@ -150,6 +110,40 @@ namespace System.Linq
             return ForEachAsyncCore(source, action, cancellationToken);
             return ForEachAsyncCore(source, action, cancellationToken);
         }
         }
 
 
+        private static async Task ForEachAsyncCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource> action, CancellationToken cancellationToken)
+        {
+            var e = source.GetAsyncEnumerator(cancellationToken);
+
+            try
+            {
+                while (await e.MoveNextAsync().ConfigureAwait(false))
+                {
+                    action(e.Current);
+                }
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+        }
+
+        private static async Task ForEachAsyncCore<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> action, CancellationToken cancellationToken)
+        {
+            var e = source.GetAsyncEnumerator(cancellationToken);
+
+            try
+            {
+                while (await e.MoveNextAsync().ConfigureAwait(false))
+                {
+                    await action(e.Current, cancellationToken).ConfigureAwait(false);
+                }
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+        }
+
         private static async Task ForEachAsyncCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource, int> action, CancellationToken cancellationToken)
         private static async Task ForEachAsyncCore<TSource>(IAsyncEnumerable<TSource> source, Action<TSource, int> action, CancellationToken cancellationToken)
         {
         {
             var index = 0;
             var index = 0;