yield 中的 C# 批处理不会同时启动

问题描述 投票:0回答:1

我目前正在尝试实现一个文件搜索器,它可以批量生成找到的文件,以便可以同时处理它。但是当我的问题产生并且我将其传递给任务时就出现了。它永远不会启动多个任务,而是等待上一个任务完成后再启动下一个任务,我希望它启动我的 SemaphoreSlim 允许的尽可能多的任务。我对yieling很陌生,不确定问题是否是yield,因为它会等到前一个yield iter完成,即使它被传递给task.run()。

public async Task RunAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();
    var index = 0;
    using var semaphore = new SemaphoreSlim(8, 16);
    foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
    {
        await semaphore.WaitAsync();
        try
        {
            tasks.Add(StartTasksAsync(items, index, cancellationToken).ContinueWith(task =>
            {
                semaphore.Release();
                tasks.Remove(task);
            }));
            index++;
        }
        catch(Exception ex)
        {
            _logger.LogError(ex, "Error in HarvestEngine RunAsync");
            semaphore.Release();
        }
    }

    await Task.WhenAll(tasks);
}

private async Task StartTasksAsync(
    IEnumerable<string> items, 
    int index,
    CancellationToken cancellationToken)
{
    _logger.LogInformation($"[Task {index}] has started.....");

    var databaseService = _serviceProvider.GetRequiredService<DatabaseService>();
    var collectedSensitiveFiles = new List<FileInformation>();

    foreach (var item in items)
    {
        _logger.LogInformation($"{index} has found {item}");
    }
    await Task.Delay(1000);

    _logger.LogInformation($"[Task {index}] is done.....");
}

如果我将task.delay设置为10秒,它会在下一个任务开始之前等待10秒,即使task.delay位于我新启动的任务中,并且应该只让我的新任务休眠,而不是这样做在下一个任务开始之前等待所需的时间。所以如果我放 1 分钟,下次启动之前需要 1 分钟等,这对我来说没有任何意义(除非有一些我不知道的东西)。

这就是我找到文件的方式:

internal static IEnumerable<IEnumerable<string>> EnumerateFilesRecursively(
    string directory, 
    int batchSize,
    CancellationToken cancellationToken)
{
    Stack<string> stack = new Stack<string>();
    List<string> currentFoundFiles = new List<string>();
    stack.Push(directory);

    while (stack.Count > 0)
    {
        cancellationToken.ThrowIfCancellationRequested();
        string currentDir = stack.Pop();

        try
        {
            foreach (string subDir in Directory.GetDirectories(currentDir))
                stack.Push(subDir);
        }
        catch (UnauthorizedAccessException)
        {
            Log.Error("File Searcher: UnauthorizedAccessException");
        }
        catch (DirectoryNotFoundException)
        {
            Log.Error("File Searcher: UnauthorizedAccessException");

        }

        try
        {
            var files = Directory.GetFiles(currentDir, "*")
                .Where(file => allowedExtensions.Contains(Path.GetExtension(file)))
                .ToList();

            currentFoundFiles.AddRange(files);
        }
        catch (UnauthorizedAccessException)
        {
            Log.Error("File Searcher 2: UnauthorizedAccessException");
            continue;
        }
        catch (DirectoryNotFoundException)
        {
            Log.Error("File Searcher 2: UnauthorizedAccessException");
            continue;
        }

        if (currentFoundFiles.Count < batchSize) continue;
        yield return currentFoundFiles;
        currentFoundFiles.Clear();
    }

    if(!currentFoundFiles.IsNullOrEmpty())
        yield return currentFoundFiles;
}

** 已更新 **

所以我将其简化为并添加了Task.Yield()。 令人困惑的是我的任务中的 foreach。run 只打印我上次创建的任务中的所有项目,而所有以前的任务只打印 IEnumberable 集合中的第一个项目

    public async Task RunAsync(CancellationToken cancellationToken)
{
    var tasks = new List<Task>();
    var index = 0;
    using var semaphore = new SemaphoreSlim(8, 16);
    foreach (var items in FileSearcher.EnumerateFilesRecursively(@"E:/nestedAAAA", batchSize, cancellationToken))
    {
        try
        {
            tasks.Add(Task.Run(async () =>
            {
                await Task.Yield();
                _logger.LogInformation($"[Task {index}] has started.....");

                foreach (var item in items)
                {
                    _logger.LogInformation($"{index} has found {item}");
                }

                _logger.LogInformation($"[Task {index}] is done.....");
            }, cancellationToken));
            index++;
        }
        catch(Exception ex)
        {
            _logger.LogError(ex, "Error in HarvestEngine RunAsync");
            semaphore.Release();
        }
    }

    await Task.WhenAll(tasks);
}
c# asynchronous task yield concurrently
1个回答
2
投票

StartTasksAsync
是同步的,直到第一个
await
本身不是同步的;在你的情况下,直到之后foreach

如果您的目的是让这些实际在后台运行,您可以:

    在方法的
  1. start
     处添加 
    await Task.Yield();,以强制其更早变得真正异步
  2. 在调用
  3. 方法时使用
    Task.Run
    ,注意任何捕获的变量(以便您传入预期值,而不是调用回调时将来某个时间的值)
  4. 第二个是更明确的背景,但两者都应该有效

另外:

信号量似乎只保护
    tasks
  • ,但该方法是唯一访问
    tasks
    的方法,因此不需要任何保护 - IMO 完全删除信号量
    您在 
  • tasks
  • 完成之前将其删除; IMO 删除
    Remove
    
    
  • 如果信号量的目的是您希望同时运行固定数量的事物:也许使用
Parallel.ForEach

代替,使用

ParallelOptions.MaxDegreeOfParallelism
    

© www.soinside.com 2019 - 2024. All rights reserved.