在 C# 中使用分区程序并行查询带分页的 REST-API

问题描述 投票:0回答:3

我想知道我的方法是否适合并行查询 REST-API,因为一次请求(1000)可以获得多少结果是有限制的。为了加快速度,我想并行执行此操作。

想法是使用分区程序创建一组范围(在我的例子中是 10 个范围)。每个范围都并行执行以查询 API 端点。

结果是任务数组。使用

Task.WhenAll(tasks)
我等到所有任务都完成,然后我必须展平
string[][]
数组以获得一维数组。

任何想法或更好的解决方案?

public async Task<string[]> QueryApiAsParallel() {
    int maximum = 10000; // I don't want to query more than 10000 results,
                         // even I know that are a lot more results
    int rangeSize = 1000; // maximum number that can be received via API

    Task<string[]>[] tasks = Partitioner.Create(0, maximum, rangeSize).AsParallel()
        .Select(async (range, index) => {
        int skip = range.Item1;
        int first = range.Item2 - range.Item1;

        string[] names = await apiClient.GetNames(skip, first);

        return names;
    }).ToArray();

    string[][] tasksCompleted = await Task.WhenAll(tasks);

    string[] flattened = tasksCompleted.SelectMany(x => x).ToArray();

    return flattened;
}

实施可能有点低效。

c# parallel-processing .net-7.0 plinq partitioner
3个回答
2
投票

您可以使用 .NET 6

Task.WhenAll
 API,而不是使用 PLINQ 创建任务并使用 
Parallel.ForEachAsync
等待它们。此方法没有接受
Partitioner<T>
的重载,也没有返回并行操作结果的重载。为了解决这些限制,在下面的示例中,我使用
GetOrderableDynamicPartitions
方法来获取分区,并使用
ConcurrentDictionary<long, string[]>
来存储结果:

public async Task<string[]> QueryApiParallelAsync()
{
    int maximum = 10000;
    int rangeSize = 1000;

    IEnumerable<KeyValuePair<long, Tuple<int, int>>> partitions = Partitioner
        .Create(0, maximum, rangeSize)
        .GetOrderableDynamicPartitions();

    ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };

    ConcurrentDictionary<long, string[]> results = new();

    await Parallel.ForEachAsync(partitions, options, async (entry, ct) =>
    {
        long index = entry.Key;
        int start = entry.Value.Item1;
        int count = entry.Value.Item2 - entry.Value.Item1;

        string[] names = await apiClient.GetNames(start, count);

        results.TryAdd(index, names);
    }).ConfigureAwait(false);

    return results.OrderBy(e => e.Key).SelectMany(e => e.Value).ToArray();
}

这不是按原始顺序收集

Parallel.ForEachAsync
循环结果的唯一方法。你可以看看
MaxDegreeOfParallelism
,您可以尝试各种值,直到达到产生最佳性能的最佳点。您还可以看看这个问题:Factors for determining the degree of parallelism for the ForEachAsync.


0
投票

如果您使用的是.NET 6 及更高版本,您可以尝试使用

Parallel.ForEachAsync()
查看文档

我尝试稍微更改您的代码以适应示例实现并能够比较查询。

这是一个小的dotnet fiddle

然后就可以按照这个例子使用了

public static async Task<string[]> QueryApiAsParallelOptimized() 
{
    int maximum = 10000; 
    int rangeSize = 1000;

    // Concurrent to be thread safe
    var results = new ConcurrentBag<string[]>();

    // Simple parititioning by `.Range` and `.Chunk` methods
    // Eventually you can also use the Partitioner here?
    var partitions = Enumerable.Range(0, maximum);
    var chunks = partitions.Chunk(rangeSize);
        
    var parallelOptions = new ParallelOptions()
    {
        // Set this to a high value for max parallelism
        MaxDegreeOfParallelism = 1000
    };
        
    await Parallel.ForEachAsync(chunks, parallelOptions, async (range, token) =>
    {
        // Implement you logic here. This is just the example implementation
        int skip = range.First();
        string[] names = await CallApiAsync(skip, rangeSize);
        results.Add(names);
    });
    
    // Flatten as you had it before.
    string[] flattened = results.SelectMany(x => x).ToArray();
    return flattened;
}

从 dotnet fiddle 来看,它快了大约 20%。


0
投票

我认为,如果您首先将您的操作视为请求单个“页面”而不是分区范围,那么您可以简化事情。

我怀疑您可能没有从这里的并行性中获得太多价值:您的大部分时间可能都花在等待 I/O 完成上。虽然很容易添加对

.AsParallel().WithDegreeOfParallelism(...)
的调用来测试该假设。

int maxPages = (int)Math.Ceiling(maximum/(double)rangeSize);

Task<string[]>[] tasks = Enumerable.Range(0, maxPages)
    .Select(async pageNumber => await apiClient.GetNames(pageNumber * rangeSize, rangeSize))
    .ToArray();

请注意,如果最大值不是您的页面大小的倍数,这可能会超过您想要的最大值。如果这是一个问题,那么用更多的数学来解决并不难。但我怀疑,如果你考虑你的领域,你可能更多地考虑首先要发出多少请求,然后从中逆向工程

maximum
——你可以避免的步骤如果您首先将
maxPages
rangeSize
作为您的起始输入。

你没有提到你访问的 API 是否对允许的并发请求数有限制。由于这种方法会在第一次调用

.ToArray()
时立即发送您的所有请求,因此您需要确保 API 不会开始拒绝或不成比例地限制您的请求。如果您需要限制自己的请求,使用
ForEachAsync
可能是最简单的方法,如 Theodor Zoulias 的回答 中所述。

© www.soinside.com 2019 - 2024. All rights reserved.