我想知道我的方法是否适合并行查询 REST-API,因为一次请求(1000)可以获得多少结果是有限制的。为了加快速度,我想并行执行此操作。
想法是使用分区程序创建一组范围(在我的例子中是 10 个范围)。每个范围都并行执行以查询 API 端点。
结果是任务数组。使用
Task.WhenAll(tasks)
我等到所有任务都完成,然后我必须展平 string[][]
数组以获得一维数组。
任何想法或更好的解决方案?
public async Task<string[]> QueryApiAsParallel() {
int maximum = 10000; // I don't want to query more than 10000 results,
// even I know that are a lot more results
int rangeSize = 1000; // maximum number that can be received via API
Task<string[]>[] tasks = Partitioner.Create(0, maximum, rangeSize).AsParallel()
.Select(async (range, index) => {
int skip = range.Item1;
int first = range.Item2 - range.Item1;
string[] names = await apiClient.GetNames(skip, first);
return names;
}).ToArray();
string[][] tasksCompleted = await Task.WhenAll(tasks);
string[] flattened = tasksCompleted.SelectMany(x => x).ToArray();
return flattened;
}
实施可能有点低效。
您可以使用 .NET 6
Task.WhenAll
API,而不是使用 PLINQ 创建任务并使用
Parallel.ForEachAsync
等待它们。此方法没有接受 Partitioner<T>
的重载,也没有返回并行操作结果的重载。为了解决这些限制,在下面的示例中,我使用 GetOrderableDynamicPartitions
方法来获取分区,并使用 ConcurrentDictionary<long, string[]>
来存储结果:
public async Task<string[]> QueryApiParallelAsync()
{
int maximum = 10000;
int rangeSize = 1000;
IEnumerable<KeyValuePair<long, Tuple<int, int>>> partitions = Partitioner
.Create(0, maximum, rangeSize)
.GetOrderableDynamicPartitions();
ParallelOptions options = new() { MaxDegreeOfParallelism = 2 };
ConcurrentDictionary<long, string[]> results = new();
await Parallel.ForEachAsync(partitions, options, async (entry, ct) =>
{
long index = entry.Key;
int start = entry.Value.Item1;
int count = entry.Value.Item2 - entry.Value.Item1;
string[] names = await apiClient.GetNames(start, count);
results.TryAdd(index, names);
}).ConfigureAwait(false);
return results.OrderBy(e => e.Key).SelectMany(e => e.Value).ToArray();
}
这不是按原始顺序收集
Parallel.ForEachAsync
循环结果的唯一方法。你可以看看MaxDegreeOfParallelism
,您可以尝试各种值,直到达到产生最佳性能的最佳点。您还可以看看这个问题:Factors for determining the degree of parallelism for the ForEachAsync.
如果您使用的是.NET 6 及更高版本,您可以尝试使用
Parallel.ForEachAsync()
。 查看文档
我尝试稍微更改您的代码以适应示例实现并能够比较查询。
这是一个小的dotnet fiddle
然后就可以按照这个例子使用了
public static async Task<string[]> QueryApiAsParallelOptimized()
{
int maximum = 10000;
int rangeSize = 1000;
// Concurrent to be thread safe
var results = new ConcurrentBag<string[]>();
// Simple parititioning by `.Range` and `.Chunk` methods
// Eventually you can also use the Partitioner here?
var partitions = Enumerable.Range(0, maximum);
var chunks = partitions.Chunk(rangeSize);
var parallelOptions = new ParallelOptions()
{
// Set this to a high value for max parallelism
MaxDegreeOfParallelism = 1000
};
await Parallel.ForEachAsync(chunks, parallelOptions, async (range, token) =>
{
// Implement you logic here. This is just the example implementation
int skip = range.First();
string[] names = await CallApiAsync(skip, rangeSize);
results.Add(names);
});
// Flatten as you had it before.
string[] flattened = results.SelectMany(x => x).ToArray();
return flattened;
}
从 dotnet fiddle 来看,它快了大约 20%。
我认为,如果您首先将您的操作视为请求单个“页面”而不是分区范围,那么您可以简化事情。
我怀疑您可能没有从这里的并行性中获得太多价值:您的大部分时间可能都花在等待 I/O 完成上。虽然很容易添加对
.AsParallel().WithDegreeOfParallelism(...)
的调用来测试该假设。
int maxPages = (int)Math.Ceiling(maximum/(double)rangeSize);
Task<string[]>[] tasks = Enumerable.Range(0, maxPages)
.Select(async pageNumber => await apiClient.GetNames(pageNumber * rangeSize, rangeSize))
.ToArray();
请注意,如果最大值不是您的页面大小的倍数,这可能会超过您想要的最大值。如果这是一个问题,那么用更多的数学来解决并不难。但我怀疑,如果你考虑你的领域,你可能更多地考虑首先要发出多少请求,然后从中逆向工程
maximum
——你可以避免的步骤如果您首先将 maxPages
和 rangeSize
作为您的起始输入。
你没有提到你访问的 API 是否对允许的并发请求数有限制。由于这种方法会在第一次调用
.ToArray()
时立即发送您的所有请求,因此您需要确保 API 不会开始拒绝或不成比例地限制您的请求。如果您需要限制自己的请求,使用 ForEachAsync
可能是最简单的方法,如 Theodor Zoulias 的回答 中所述。