我目前正在尝试实现一个文件搜索器,它可以批量生成找到的文件,以便可以同时处理它。但是当我的问题产生并且我将其传递给任务时就出现了。它永远不会启动多个任务,而是等待上一个任务完成后再启动下一个任务,我希望它启动我的 SemaphoreSlim 允许的尽可能多的任务。我对yieling很陌生,不确定问题是否是yield,因为它会等到前一个yield iter完成,即使它被传递给task.run()。
public async Task RunAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
var index = 0;
using var semaphore = new SemaphoreSlim(8, 16);
foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
{
await semaphore.WaitAsync();
try
{
tasks.Add(StartTasksAsync(items, index, cancellationToken).ContinueWith(task =>
{
semaphore.Release();
tasks.Remove(task);
}));
index++;
}
catch(Exception ex)
{
_logger.LogError(ex, "Error in HarvestEngine RunAsync");
semaphore.Release();
}
}
await Task.WhenAll(tasks);
}
private async Task StartTasksAsync(
IEnumerable<string> items,
int index,
CancellationToken cancellationToken)
{
_logger.LogInformation($"[Task {index}] has started.....");
var databaseService = _serviceProvider.GetRequiredService<DatabaseService>();
var collectedSensitiveFiles = new List<FileInformation>();
foreach (var item in items)
{
_logger.LogInformation($"{index} has found {item}");
}
await Task.Delay(1000);
_logger.LogInformation($"[Task {index}] is done.....");
}
如果我将task.delay设置为10秒,它会在下一个任务开始之前等待10秒,即使task.delay位于我新启动的任务中,并且应该只让我的新任务休眠,而不是这样做在下一个任务开始之前等待所需的时间。所以如果我放 1 分钟,下次启动之前需要 1 分钟等,这对我来说没有任何意义(除非有一些我不知道的东西)。
这就是我找到文件的方式:
internal static IEnumerable<IEnumerable<string>> EnumerateFilesRecursively(
string directory,
int batchSize,
CancellationToken cancellationToken)
{
Stack<string> stack = new Stack<string>();
List<string> currentFoundFiles = new List<string>();
stack.Push(directory);
while (stack.Count > 0)
{
cancellationToken.ThrowIfCancellationRequested();
string currentDir = stack.Pop();
try
{
foreach (string subDir in Directory.GetDirectories(currentDir))
stack.Push(subDir);
}
catch (UnauthorizedAccessException)
{
Log.Error("File Searcher: UnauthorizedAccessException");
}
catch (DirectoryNotFoundException)
{
Log.Error("File Searcher: UnauthorizedAccessException");
}
try
{
var files = Directory.GetFiles(currentDir, "*")
.Where(file => allowedExtensions.Contains(Path.GetExtension(file)))
.ToList();
currentFoundFiles.AddRange(files);
}
catch (UnauthorizedAccessException)
{
Log.Error("File Searcher 2: UnauthorizedAccessException");
continue;
}
catch (DirectoryNotFoundException)
{
Log.Error("File Searcher 2: UnauthorizedAccessException");
continue;
}
if (currentFoundFiles.Count < batchSize) continue;
yield return currentFoundFiles;
currentFoundFiles.Clear();
}
if(!currentFoundFiles.IsNullOrEmpty())
yield return currentFoundFiles;
}
** 已更新 **
所以我将其简化为并添加了Task.Yield()。 令人困惑的是我的任务中的 foreach。run 只打印我上次创建的任务中的所有项目,而所有以前的任务只打印 IEnumberable 集合中的第一个项目
public async Task RunAsync(CancellationToken cancellationToken)
{
var tasks = new List<Task>();
var index = 0;
using var semaphore = new SemaphoreSlim(8, 16);
foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
{
try
{
tasks.Add(Task.Run(async () =>
{
await Task.Yield();
_logger.LogInformation($"[Task {index}] has started.....");
foreach (var item in items)
{
_logger.LogInformation($"{index} has found {item}");
}
_logger.LogInformation($"[Task {index}] is done.....");
}, cancellationToken));
index++;
}
catch(Exception ex)
{
_logger.LogError(ex, "Error in HarvestEngine RunAsync");
semaphore.Release();
}
}
await Task.WhenAll(tasks);
}
StartTasksAsync
是同步的,直到第一个 await
本身不是同步的;在你的情况下,直到之后foreach
。如果您的目的是让这些实际在后台运行,您可以:
处添加
await Task.Yield();
,以强制其更早变得真正异步
Task.Run
,注意任何捕获的变量(以便您传入预期值,而不是调用回调时将来某个时间的值)
另外:
信号量似乎只保护
tasks
tasks
的方法,因此不需要任何保护 - IMO 完全删除信号量您在 tasks
Remove
Parallel.ForEach
代替,使用
ParallelOptions.MaxDegreeOfParallelism