嵌套在Parallel.ForEach中的异步方法。

问题描述 投票:1回答:4

我有一个方法,在其中运行多个异步方法。我必须迭代一个设备列表,并将设备传递给这个方法。我注意到这需要很长的时间来完成,所以我考虑使用 Parallel.ForEach 所以它可以同时对多个设备运行这个进程。

比如说这是我的方法。

public async Task ProcessDevice(Device device) {
    var dev = await _deviceService.LookupDeviceIndbAsNoTracking(device);

    var result = await DoSomething(dev);
    await DoSomething2(dev);
}

那么DoSomething2也会调用一个异步方法。

public async Task DoSomething2(Device dev) {
    foreach(var obj in dev.Objects) {
        await DoSomething3(obj);
    }
}

随着时间的推移,设备列表不断变大,所以这个列表越大,程序完成运行所需要的时间就会越长 ProcessDevice() 针对每个设备。我想一次处理多个设备。所以我一直在研究使用 Parallel.ForEach.

Parallel.ForEach(devices, async device => {
    try {
        await ProcessDevice(device);
    } catch (Exception ex) {
        throw ex;
    }
})

似乎在设备完全处理之前,程序就已经完成了。我也试过创建一个任务列表,然后foreach设备,在该列表中添加一个运行ProcessDevice的新任务,然后等待Task.WhenAll(listOfTasks)。

var listOfTasks = new List<Task>();
foreach(var device in devices) {
    var task = Task.Run(async () => await ProcessDevice(device));
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

但似乎在任务被标记为完成前 ProcessDevice() 实际上已经完成运行。

请原谅我对这个问题的无知,因为我是并行处理的新手,不知道是怎么回事。到底发生了什么事情导致这种行为,有没有什么文档可以帮助我更好地理解该怎么做?

c# parallel-processing async-await task-parallel-library parallel.foreach
4个回答
6
投票

你不能混合 asyncParallel.ForEach. 由于你的底层操作是异步的,所以你要使用异步并发,而不是并行。异步并发最简单的表达方式是用 WhenAll:

var listOfTasks = devices.Select(ProcessDevice).ToList();
await Task.WhenAll(listOfTasks);

1
投票

在你最后的例子中,有几个问题。

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    await  Task.Run(async () => await ProcessDevice(device));
}
await Task.WhenAll(listOfTasks);

做... await Task.Run(async () => await ProcessDevice(device)); 意味着你没有移动到下一个迭代的 foreach 循环,直到前一个循环完成。本质上,你还是在一次一次地做它们。

此外,你并没有向你的任务添加任何任务到 listOfTasks 因此,它仍然是空的,因此 Task.WhenAll(listOfTasks) 瞬间完成,因为没有任务需要等待。

试试这个。

var listOfTasks = new List<Task>();
foreach (var device in devices)
{
    var task = Task.Run(async () => await ProcessDevice(device))
    listOfTasks.Add(task);
}
await Task.WhenAll(listOfTasks);

1
投票

我可以解释一下这个问题 Parallel.ForEach. 一个重要的问题是,当 await 关键词作用于一个不完整的 Task,它 返回. 它将返回自己的不完整 Task 如果方法签名允许的话(如果不是 void). 然后就由调用者来使用这个 Task 对象来等待作业的完成。

但在 Parallel.ForEach 是一个 Action<T>,这是一个 void 方法,这意味着没有 Task 可以返回,这意味着调用者(Parallel.ForEach 在这种情况下)没有办法等到工作完成。

所以,在你的情况下,只要它点击了 await ProcessDevice(device),它返回,什么都不等它完成,所以它开始下一次迭代。到了 Parallel.ForEach 完成了,它所做的就是 开始 所有的任务,但没有等到它们。

所以不要用 Parallel.ForEach 与异步代码。

Stephen的答案更合适。你也可以使用WSC的答案,但对于较大的列表来说,这可能是危险的。一次性创建数百或数千个新线程对你的性能没有帮助。


-1
投票

我不太确定这是否是你所要求的,但我可以举例说明我们如何启动异步进程。

 private readonly Func<Worker> _worker;

    private void StartWorkers(IEnumerable<Props> props){
    Parallel.ForEach(props, timestamp => { _worker.Invoke().Consume(timestamp); });
    }

推荐阅读 并行.ForEach 因为它会为你做一些部分。

© www.soinside.com 2019 - 2024. All rights reserved.