Browse Source

Async variants of ForEachAsync.

Bart De Smet 8 years ago
parent
commit
91dc1f106a
1 changed files with 79 additions and 0 deletions
  1. 79 0
      Ix.NET/Source/System.Interactive.Async/ForEach.cs

+ 79 - 0
Ix.NET/Source/System.Interactive.Async/ForEach.cs

@@ -90,6 +90,66 @@ namespace System.Linq
             return ForEachAsync_(source, action, cancellationToken);
         }
 
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> action)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ForEachAsync(source, action, CancellationToken.None);
+        }
+
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, Task> action)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ForEachAsync(source, action, CancellationToken.None);
+        }
+
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, Task> action, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return source.ForEachAsync((x, i, ct) => action(x), cancellationToken);
+        }
+
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, CancellationToken, Task> action, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return source.ForEachAsync((x, i, ct) => action(x, ct), cancellationToken);
+        }
+
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, Task> action, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return source.ForEachAsync((x, i, ct) => action(x, i), cancellationToken);
+        }
+
+        public static Task ForEachAsync<TSource>(this IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, Task> action, CancellationToken cancellationToken)
+        {
+            if (source == null)
+                throw new ArgumentNullException(nameof(source));
+            if (action == null)
+                throw new ArgumentNullException(nameof(action));
+
+            return ForEachAsync_(source, action, cancellationToken);
+        }
+
         private static async Task ForEachAsync_<TSource>(IAsyncEnumerable<TSource> source, Action<TSource, int> action, CancellationToken cancellationToken)
         {
             var index = 0;
@@ -108,5 +168,24 @@ namespace System.Linq
                 await e.DisposeAsync().ConfigureAwait(false);
             }
         }
+
+        private static async Task ForEachAsync_<TSource>(IAsyncEnumerable<TSource> source, Func<TSource, int, CancellationToken, Task> action, CancellationToken cancellationToken)
+        {
+            var index = 0;
+
+            var e = source.GetAsyncEnumerator();
+
+            try
+            {
+                while (await e.MoveNextAsync(cancellationToken).ConfigureAwait(false))
+                {
+                    await action(e.Current, checked(index++), cancellationToken).ConfigureAwait(false);
+                }
+            }
+            finally
+            {
+                await e.DisposeAsync().ConfigureAwait(false);
+            }
+        }
     }
 }