Parallel.ForEachAsync 保持排序顺序

问题描述 投票:0回答:2

我正在尝试使用

Parallel.ForEachAsync
执行文件上传,它可以工作,但会丢失排序顺序。有什么方法可以同步排序顺序或源列表和目标列表吗?

await Parallel.ForEachAsync(model.DestinationFiles,
    new ParallelOptions { MaxDegreeOfParallelism = 20 }, async (file, CancellationToken) =>
    {
        var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
        convertResultDto.Files.Add(new ConverterConvertResultFile(storeAsync));
    });

之前我使用过 Linq 并行运算符 (PLINQ),它具有

AsOrdered
运算符来处理排序。无论如何,我认为
Parallel.ForEachAsync
更适合在具有 I/O 场景的异步方法中使用?

var storeFiles = model.DestinationFiles.AsParallel().AsOrdered().WithDegreeOfParallelism(50)
    .Select(file => StoreAsync(file.FileInfo, false, file.OutputFileName).GetAwaiter().GetResult())
    .Select(storeFile => new StoreFile
    {
        FileId = storeFile.FileId,
        Url = storeFile.Url,
        OutputFileName = storeFile.OutputFileName,
        Size = storeFile.Size
    });
c# asynchronous task-parallel-library parallel.foreachasync
2个回答
1
投票

在这种情况下,您想要获取一组结果并将它们存储在结果集合中。

Parallel
专为更多操作而设计,结果。对于有结果的操作,您可以使用 PLINQ 进行 CPU 密集型操作,或使用异步并发进行 I/O 密集型操作。不幸的是,没有
Parallel.ForEachAsync
的 PLINQ 等效项,这将是与您当前代码最接近的等效项。

异步并发使用

Task.WhenAll
来获取多个异步操作的结果。它还可以使用
SemaphoreSlim
进行节流。像这样的东西:

var mutex = new SemaphoreSlim(20);
var results = await Task.WhenAll(model.DestinationFiles.Select(async file =>
{
  await mutex.WaitAsync();
  try
  {
    var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
    return new ConverterConvertResultFile(storeAsync);
  }
  finally { mutex.Release(); }
});
convertResultDto.Files.AddRange(results);

但是,如果您混合执行 CPU 密集型和 I/O 密集型操作,那么您可能会想继续使用

ForEachAsync
。在这种情况下,您可以首先在目标集合中创建条目,然后使用索引执行每个操作,以便它知道将它们存储在哪里:

// This code assumes convertResultDto.Files is empty at this point.
var count = model.DestinationFiles.Count;
convertResultDto.Files.AddRange(Enumerable.Repeat<ConverterConvertResultFile>(null!, count));
await Parallel.ForEachAsync(
    model.DestinationFiles.Select((file, i) => (file, i)),
    new ParallelOptions { MaxDegreeOfParallelism = 20 },
    async item =>
    {
      var (file, i) = item;
      var storeAsync = await _fileServerService.Init(displayUrl).StoreAsync(file.FileInfo, false, file.OutputFileName);
      convertResultDto.Files[i] = new ConverterConvertResultFile(storeAsync);
    });

0
投票

尝试按 OutputFileName 顺序进行处理: 等待 Parallel.ForEachAsync(model.DestinationFiles.OrderBy(f => f.OutputFileName),

© www.soinside.com 2019 - 2024. All rights reserved.