|
@@ -2,6 +2,7 @@
|
|
|
using System;
|
|
using System;
|
|
|
using System.Collections.Generic;
|
|
using System.Collections.Generic;
|
|
|
using System.Linq;
|
|
using System.Linq;
|
|
|
|
|
+using System.Threading;
|
|
|
using System.Threading.Tasks;
|
|
using System.Threading.Tasks;
|
|
|
|
|
|
|
|
namespace Masuit.Tools
|
|
namespace Masuit.Tools
|
|
@@ -28,17 +29,42 @@ namespace Masuit.Tools
|
|
|
|
|
|
|
|
#region AsyncForEach
|
|
#region AsyncForEach
|
|
|
|
|
|
|
|
- /// <summary>
|
|
|
|
|
- /// 遍历IEnumerable
|
|
|
|
|
- /// </summary>
|
|
|
|
|
- /// <param name="objs"></param>
|
|
|
|
|
- /// <param name="action">回调方法</param>
|
|
|
|
|
- /// <typeparam name="T"></typeparam>
|
|
|
|
|
- public static async void ForEachAsync<T>(this IEnumerable<T> objs, Action<T> action)
|
|
|
|
|
|
|
+ public static async Task ForeachAsync<T>(this IEnumerable<T> source, int maxParallelCount, Func<T, Task> action)
|
|
|
{
|
|
{
|
|
|
- await Task.Run(() => Parallel.ForEach(objs, action));
|
|
|
|
|
- }
|
|
|
|
|
|
|
+ using SemaphoreSlim completeSemphoreSlim = new(1);
|
|
|
|
|
+ using SemaphoreSlim taskCountLimitsemaphoreSlim = new(maxParallelCount);
|
|
|
|
|
+ await completeSemphoreSlim.WaitAsync();
|
|
|
|
|
+ int runningtaskCount = source.Count();
|
|
|
|
|
+
|
|
|
|
|
+ foreach (var item in source)
|
|
|
|
|
+ {
|
|
|
|
|
+ await taskCountLimitsemaphoreSlim.WaitAsync();
|
|
|
|
|
+ Task.Run(async () =>
|
|
|
|
|
+ {
|
|
|
|
|
+ try
|
|
|
|
|
+ {
|
|
|
|
|
+ await action(item).ContinueWith(task =>
|
|
|
|
|
+ {
|
|
|
|
|
+ Interlocked.Decrement(ref runningtaskCount);
|
|
|
|
|
+ if (runningtaskCount == 0)
|
|
|
|
|
+ {
|
|
|
|
|
+ completeSemphoreSlim.Release();
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ finally
|
|
|
|
|
+ {
|
|
|
|
|
+ taskCountLimitsemaphoreSlim.Release();
|
|
|
|
|
+ }
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
|
|
|
|
|
+ await completeSemphoreSlim.WaitAsync();
|
|
|
|
|
+ }
|
|
|
|
|
+ public static async Task ForeachAsync<T>(this IEnumerable<T> source, Func<T, Task> action)
|
|
|
|
|
+ {
|
|
|
|
|
+ await ForeachAsync(source, source.Count(), action);
|
|
|
|
|
+ }
|
|
|
#endregion AsyncForEach
|
|
#endregion AsyncForEach
|
|
|
|
|
|
|
|
/// <summary>
|
|
/// <summary>
|
|
@@ -176,5 +202,18 @@ namespace Masuit.Tools
|
|
|
set.UnionWith(source.Select(selector));
|
|
set.UnionWith(source.Select(selector));
|
|
|
return set;
|
|
return set;
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ /// <summary>
|
|
|
|
|
+ /// 异步Select
|
|
|
|
|
+ /// </summary>
|
|
|
|
|
+ /// <typeparam name="T"></typeparam>
|
|
|
|
|
+ /// <typeparam name="TResult"></typeparam>
|
|
|
|
|
+ /// <param name="source"></param>
|
|
|
|
|
+ /// <param name="selector"></param>
|
|
|
|
|
+ /// <returns></returns>
|
|
|
|
|
+ public static async Task<IEnumerable<TResult>> SelectAsync<T, TResult>(this IEnumerable<T> source, Func<T, Task<TResult>> selector)
|
|
|
|
|
+ {
|
|
|
|
|
+ return await Task.WhenAll(source.Select(selector));
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|