Browse Source

新增SelectManyAsync

懒得勤快 7 months ago
parent
commit
f9e62f79ad

+ 100 - 0
Masuit.Tools.Abstractions/Extensions/BaseType/IEnumerableExtensions.cs

@@ -852,6 +852,106 @@ public static class IEnumerableExtensions
         return results;
     }
 
+    /// <summary>
+    /// 异步SelectMany
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    /// <typeparam name="TResult"></typeparam>
+    /// <param name="source"></param>
+    /// <param name="selector"></param>
+    /// <returns></returns>
+    public static Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(this IEnumerable<T> source, Func<T, Task<IEnumerable<TResult>>> selector)
+    {
+        return Task.WhenAll(source.Select(selector)).ContinueWith(t => t.Result.SelectMany(_ => _));
+    }
+
+    /// <summary>
+    /// 异步SelectMany
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    /// <typeparam name="TResult"></typeparam>
+    /// <param name="source"></param>
+    /// <param name="selector"></param>
+    /// <returns></returns>
+    public static Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(this IEnumerable<T> source, Func<T, int, Task<IEnumerable<TResult>>> selector)
+    {
+        return Task.WhenAll(source.Select(selector)).ContinueWith(t => t.Result.SelectMany(_ => _));
+    }
+
+    /// <summary>
+    /// 异步SelectMany
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    /// <typeparam name="TResult"></typeparam>
+    /// <param name="source"></param>
+    /// <param name="selector"></param>
+    /// <param name="maxParallelCount">最大并行数</param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public static async Task<List<TResult>> SelectManyAsync<T, TResult>(this IEnumerable<T> source, Func<T, Task<IEnumerable<TResult>>> selector, int maxParallelCount, CancellationToken cancellationToken = default)
+    {
+        var results = new List<TResult>();
+        var tasks = new List<Task<IEnumerable<TResult>>>();
+        foreach (var item in source)
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                break;
+            }
+
+            var task = selector(item);
+            tasks.Add(task);
+            if (tasks.Count >= maxParallelCount)
+            {
+                await Task.WhenAny(tasks);
+                var completedTasks = tasks.Where(t => t.IsCompleted).ToArray();
+                results.AddRange(completedTasks.SelectMany(t => t.Result));
+                tasks.RemoveWhere(t => completedTasks.Contains(t));
+            }
+        }
+
+        results.AddRange(await Task.WhenAll(tasks).ContinueWith(t => t.Result.SelectMany(_ => _), cancellationToken));
+        return results;
+    }
+
+    /// <summary>
+    /// 异步SelectMany
+    /// </summary>
+    /// <typeparam name="T"></typeparam>
+    /// <typeparam name="TResult"></typeparam>
+    /// <param name="source"></param>
+    /// <param name="selector"></param>
+    /// <param name="maxParallelCount">最大并行数</param>
+    /// <param name="cancellationToken"></param>
+    /// <returns></returns>
+    public static async Task<List<TResult>> SelectManyAsync<T, TResult>(this IEnumerable<T> source, Func<T, int, Task<IEnumerable<TResult>>> selector, int maxParallelCount, CancellationToken cancellationToken = default)
+    {
+        var results = new List<TResult>();
+        var tasks = new List<Task<IEnumerable<TResult>>>();
+        int index = 0;
+        foreach (var item in source)
+        {
+            if (cancellationToken.IsCancellationRequested)
+            {
+                break;
+            }
+
+            var task = selector(item, index);
+            tasks.Add(task);
+            Interlocked.Add(ref index, 1);
+            if (tasks.Count >= maxParallelCount)
+            {
+                await Task.WhenAny(tasks);
+                var completedTasks = tasks.Where(t => t.IsCompleted).ToArray();
+                results.AddRange(completedTasks.SelectMany(t => t.Result));
+                tasks.RemoveWhere(t => completedTasks.Contains(t));
+            }
+        }
+
+        results.AddRange(await Task.WhenAll(tasks).ContinueWith(t => t.Result.SelectMany(_ => _), cancellationToken));
+        return results;
+    }
+
     /// <summary>
     /// 异步For
     /// </summary>

+ 1 - 1
Masuit.Tools.Excel/Masuit.Tools.Excel.csproj

@@ -37,7 +37,7 @@
       </None>
     </ItemGroup>
     <ItemGroup>
-        <PackageReference Include="EPPlus" Version="7.6.0" />
+        <PackageReference Include="EPPlus" Version="7.6.1" />
         <PackageReference Include="Microsoft.SourceLink.GitHub" Version="8.0.0" PrivateAssets="All" />
     </ItemGroup>
     <ItemGroup>

+ 1 - 1
Masuit.Tools.NoSQL.MongoDBClient/Masuit.Tools.NoSQL.MongoDBClient.csproj

@@ -38,7 +38,7 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="MongoDB.Driver" Version="3.1.0" />
+    <PackageReference Include="MongoDB.Driver" Version="3.2.1" />
   </ItemGroup>
 
 </Project>

+ 1 - 1
NetCoreTest/NetCoreTest.csproj

@@ -6,7 +6,7 @@
     <ConcurrentGarbageCollection>false</ConcurrentGarbageCollection>
   </PropertyGroup>
   <ItemGroup>
-    <PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" />
+    <PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.1" />
   </ItemGroup>
   <ItemGroup>
     <Folder Include="Controllers\" />

+ 2 - 2
Test/Masuit.Tools.Abstractions.Test/Masuit.Tools.Abstractions.Test.csproj

@@ -13,10 +13,10 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
     <PackageReference Include="Moq" Version="4.20.72" />
     <PackageReference Include="xunit" Version="2.9.3" />
-    <PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
+    <PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
     </PackageReference>

+ 1 - 1
Test/Masuit.Tools.AspNetCore.ResumeFileResults.WebTest/Masuit.Tools.AspNetCore.ResumeFileResults.WebTest.csproj

@@ -23,7 +23,7 @@
   </ItemGroup>
 
   <ItemGroup>
-    <PackageReference Include="Swashbuckle.AspNetCore" Version="7.2.0" />
+    <PackageReference Include="Swashbuckle.AspNetCore" Version="7.3.1" />
   </ItemGroup>
 
   <ItemGroup>

+ 5 - 5
Test/Masuit.Tools.Core.Test/Masuit.Tools.Core.Test.csproj

@@ -9,12 +9,12 @@
   </PropertyGroup>
 
   <ItemGroup>
-    <PackageReference Include="Microsoft.AspNetCore.TestHost" Version="9.0.1" />
-    <PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="9.0.1" />
-    <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.1" />
-    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.12.0" />
+    <PackageReference Include="Microsoft.AspNetCore.TestHost" Version="9.0.2" />
+    <PackageReference Include="Microsoft.EntityFrameworkCore.InMemory" Version="9.0.2" />
+    <PackageReference Include="Microsoft.Extensions.Hosting" Version="9.0.2" />
+    <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.13.0" />
     <PackageReference Include="xunit" Version="2.9.3" />
-    <PackageReference Include="xunit.runner.visualstudio" Version="3.0.1">
+    <PackageReference Include="xunit.runner.visualstudio" Version="3.0.2">
       <PrivateAssets>all</PrivateAssets>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers</IncludeAssets>
     </PackageReference>

+ 1 - 1
Test/Masuit.Tools.Test/Masuit.Tools.Test.csproj

@@ -115,7 +115,7 @@
       <Version>2.0.3</Version>
     </PackageReference>
     <PackageReference Include="xunit.analyzers">
-      <Version>1.19.0</Version>
+      <Version>1.20.0</Version>
       <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
       <PrivateAssets>all</PrivateAssets>
     </PackageReference>