我有一个包含 ID 等基本信息的书籍列表,我需要调用多个外部 API 端点来获取其余信息,例如图像、参考文献、作者简介等。
这是一种性能较差的方法,因为它像同步方式(一次一个)一样检索信息......
foreach (var book in books)
{
book.Images = await GetImagesAsync(book.ID);
book.Refs = await GetLinksReferencesAsync(book.ID);
book.AuthorBio = await GetAuthorBioAsync(book.ID);
}
最终我想要的是获得
async
通话的优势并同时拨打大约 100 个电话。
一项改进是在
Task.WaitAll()
内部添加 foreach
,但唯一的好处是同时执行 3 个请求。但是我怎样才能以更有效的方式使用 async
调用(例如一次 100 个请求)来改进这一点?
我真的很喜欢使用 TPL 的数据流库 来完成这类事情。它允许您将多个异步操作(以及同步操作)链接在一起以形成“管道”,并且具有无数的调整来控制每个阶段的并行度、内存缓冲区大小等。
您可以使用多种类型的“块”来组成管道。也许最简单的是
TransformBlock<T1,T2>
,它采用将 T1
映射到 T2
的函数。还有 ActionBlock<T>
执行操作(但不返回值,因此实际上是管道的终止点),还有更多,例如:TransformManyBlock<T1,T2>
、BatchingBlock<T>
、连接块等,用于更复杂的管道。
对于您自己的示例,您可以按如下方式设置管道。
首先我们定义一些默认选项(例如,我们指定最大缓冲区大小为 100 本书,最大并行度为 15 个并发作业):
var defaultOptions = new ExecutionDataflowBlockOptions
{
BoundedCapacity = 100,
MaxDegreeOfParallelism = 15
};
现在为前 2 个
TransformBlock<Book,Book>
作业定义一个 GetXAsync
,并为最后一个作业定义一个 ActionBlock,以便每次使用默认选项终止我们的管道:
var getImagesBlock = new TransformBlock<Book, Book>(async b =>
{
b.Images = await GetImagesAsync(b.ID);
return b;
}, defaultOptions);
var getLinksBlock = new TransformBlock<Book, Book>(async b =>
{
b.Refs = await GetLinksReferencesAsync(b.ID);
return b;
}, defaultOptions);
var getAuthorBioBlock = new ActionBlock<Book>(async b =>
{
b.AuthorBio = await GetAuthorBioAsync(b.ID);
}, defaultOptions);
现在我们定义一些设置来管理如何链接块(探索这些设置最适合您自己的解决方案!):
var linkOptions = new DataflowLinkOptions
{
PropagateCompletion = true //when an earlier block signals it is 'Complete' and has no more messages the next block completes too after it has finished any existing messages
};
然后我们将所有 3 个块连接起来以制作管道:
getImagesBlock.LinkTo(getImagesBlock, linkOptions);
getLinksBlock.LinkTo(getAuthorBioBlock, linkOptions);
现在我们需要做的就是将每本书传递到管道的开头:
foreach (var book in books)
{
getImagesBlock.Post(book); // or we could use SendAsync if this is inside an async method
}
向第一个区块发出信号,表明我们已完成发送书籍:
getImagesBlock.Complete();
然后等待最后一个块处理完成:
getAuthorBioBlock.Completion.Wait(); // or await getAuthorBioBlock.Completion; if inside an async method
我认为,一旦您习惯了数据流选项,就可以为像这样的大量并行操作提供一种自然、易于使用且广泛适用的解决方案。我建议花时间学习如何使用它。它确实使此类工作更容易管理和优化。
顺便说一句需要注意的几点:
book
的每个操作都必须按顺序执行,就像OP中的情况一样。试试这个:
foreach (var book in books)
{
var imagesTask = GetImagesAsync(book.ID);
var refsTask = GetLinksReferencesAsync(book.ID);
var authorTask = GetAuthorBioAsync(book.ID);
Task.WaitAll(imagesTask, refsTask, authorTask);
book.Images = imagesTask.Result;
book.Refs = refsTask.Result;
book.AuthorBio = authorTask.Result;
}
在这种方法中,三个异步任务同时执行。
此时发出 50 x 3 请求...需要考虑的一件事是检查错误,以及发生错误时该怎么办,比如应该继续请求还是停止?
var books = Enumerable.Range(1, 100).Select(i => new Book { ID = i }).ToArray();
var tasks = new List<Task>(50);
var max = 50;
var counter = 0;
foreach (var book in books)
{
var bookTask = SetBookInfoAsync(book);
tasks.Add(bookTask);
counter++;
if (counter >= max)
{
var finishTask = await Task.WhenAny(tasks);
tasks.Remove(finishTask);
}
}
await Task.WhenAll(tasks);
foreach (var book in books)
{
Console.WriteLine($"Book Id {book.ID} {book.Images[0]} {book.Refs[0]} {book.AuthorBio}");
}
async Task SetBookInfoAsync(Book book)
{
book.Images = await GetImagesAsync(book.ID);
book.Refs = await GetLinksReferencesAsync(book.ID);
book.AuthorBio = await GetAuthorBioAsync(book.ID);
// you can change this as well to WhenAll() if need it
}
async Task<List<string>> GetImagesAsync(int id)
{
await Task.Delay(Random.Shared.Next(1, 100));
return ["image:" + id];
}
async Task<List<string>> GetLinksReferencesAsync(int id)
{
await Task.Delay(Random.Shared.Next(1, 100));
return ["ref:" + id];
}
async Task<string> GetAuthorBioAsync(int id)
{
await Task.Delay(Random.Shared.Next(1, 100));
return "author:" + id;
}
输出: