处理多个异步调用

问题描述 投票:0回答:3

我有一个包含 ID 等基本信息的书籍列表,我需要调用多个外部 API 端点来获取其余信息,例如图像、参考文献、作者简介等。

这是一种性能较差的方法,因为它像同步方式(一次一个)一样检索信息......

foreach (var book in books)
{
    book.Images = await GetImagesAsync(book.ID);
    book.Refs = await GetLinksReferencesAsync(book.ID);
    book.AuthorBio = await GetAuthorBioAsync(book.ID);
}

最终我想要的是获得

async
通话的优势并同时拨打大约 100 个电话。

一项改进是在

Task.WaitAll()
内部添加
foreach
,但唯一的好处是同时执行 3 个请求。但是我怎样才能以更有效的方式使用
async
调用(例如一次 100 个请求)来改进这一点?

c# asynchronous parallel-processing task
3个回答
0
投票

我真的很喜欢使用 TPL 的数据流库 来完成这类事情。它允许您将多个异步操作(以及同步操作)链接在一起以形成“管道”,并且具有无数的调整来控制每个阶段的并行度、内存缓冲区大小等。

您可以使用多种类型的“块”来组成管道。也许最简单的是

TransformBlock<T1,T2>
,它采用将
T1
映射到
T2
的函数。还有
ActionBlock<T>
执行操作(但不返回值,因此实际上是管道的终止点),还有更多,例如:
TransformManyBlock<T1,T2>
BatchingBlock<T>
、连接块等,用于更复杂的管道。

对于您自己的示例,您可以按如下方式设置管道。

首先我们定义一些默认选项(例如,我们指定最大缓冲区大小为 100 本书,最大并行度为 15 个并发作业):

var defaultOptions = new ExecutionDataflowBlockOptions
{
    BoundedCapacity = 100,
    MaxDegreeOfParallelism = 15
};

现在为前 2 个

TransformBlock<Book,Book>
作业定义一个
GetXAsync
,并为最后一个作业定义一个 ActionBlock,以便每次使用默认选项终止我们的管道:

var getImagesBlock = new TransformBlock<Book, Book>(async b =>
{
    b.Images = await GetImagesAsync(b.ID);
    return b;
}, defaultOptions);

var getLinksBlock = new TransformBlock<Book, Book>(async b =>
{
    b.Refs = await GetLinksReferencesAsync(b.ID);
    return b;
}, defaultOptions);

var getAuthorBioBlock = new ActionBlock<Book>(async b =>
{
    b.AuthorBio = await GetAuthorBioAsync(b.ID);
}, defaultOptions);

现在我们定义一些设置来管理如何链接块(探索这些设置最适合您自己的解决方案!):

var linkOptions = new DataflowLinkOptions
{
    PropagateCompletion = true //when an earlier block signals it is 'Complete' and has no more messages the next block completes too after it has finished any existing messages
};

然后我们将所有 3 个块连接起来以制作管道:

getImagesBlock.LinkTo(getImagesBlock, linkOptions);
getLinksBlock.LinkTo(getAuthorBioBlock, linkOptions);

现在我们需要做的就是将每本书传递到管道的开头:

foreach (var book in books)
{
    getImagesBlock.Post(book); // or we could use SendAsync if this is inside an async method
}

向第一个区块发出信号,表明我们已完成发送书籍:

getImagesBlock.Complete();

然后等待最后一个块处理完成:

getAuthorBioBlock.Completion.Wait(); // or await getAuthorBioBlock.Completion; if inside an async method

我认为,一旦您习惯了数据流选项,就可以为像这样的大量并行操作提供一种自然、易于使用且广泛适用的解决方案。我建议花时间学习如何使用它。它确实使此类工作更容易管理和优化。

顺便说一句需要注意的几点:

  • 我假设对给定
    book
    的每个操作都必须按顺序执行,就像OP中的情况一样。
  • 没有什么可以阻止我们在每个块上使用不同的缓冲区大小或并行度。如果(通常是这种情况)不同的块具有不同类型或成本的操作,那么我们可以根据其自身的性能特征来优化每个步骤。

0
投票

试试这个:

foreach (var book in books)
{
    var imagesTask = GetImagesAsync(book.ID);
    var refsTask = GetLinksReferencesAsync(book.ID);
    var authorTask = GetAuthorBioAsync(book.ID);

    Task.WaitAll(imagesTask, refsTask, authorTask);

    book.Images = imagesTask.Result;
    book.Refs = refsTask.Result;
    book.AuthorBio = authorTask.Result;
}

在这种方法中,三个异步任务同时执行。


0
投票

此时发出 50 x 3 请求...需要考虑的一件事是检查错误,以及发生错误时该怎么办,比如应该继续请求还是停止?

var books = Enumerable.Range(1, 100).Select(i => new Book { ID = i }).ToArray();
var tasks = new List<Task>(50);
var max = 50;
var counter = 0;
foreach (var book in books)
{
    var bookTask = SetBookInfoAsync(book);
    tasks.Add(bookTask);
    counter++;
    if (counter >= max)
    {
        var finishTask = await Task.WhenAny(tasks);
        tasks.Remove(finishTask);
    }
}
await Task.WhenAll(tasks);
foreach (var book in books)
{
    Console.WriteLine($"Book Id {book.ID} {book.Images[0]} {book.Refs[0]} {book.AuthorBio}");
}

async Task SetBookInfoAsync(Book book)
{
    book.Images = await GetImagesAsync(book.ID);
    book.Refs = await GetLinksReferencesAsync(book.ID);
    book.AuthorBio = await GetAuthorBioAsync(book.ID);
    // you can change this as well to WhenAll() if need it
}
async Task<List<string>> GetImagesAsync(int id)
{
    await Task.Delay(Random.Shared.Next(1, 100));
    return ["image:" + id];
}
async Task<List<string>> GetLinksReferencesAsync(int id)
{
    await Task.Delay(Random.Shared.Next(1, 100));
    return ["ref:" + id];
}
async Task<string> GetAuthorBioAsync(int id)
{
    await Task.Delay(Random.Shared.Next(1, 100));
    return "author:" + id;
}

输出:

© www.soinside.com 2019 - 2024. All rights reserved.