如何在块中异步收集任务+使用具有TCP连接限制的信号量?

问题描述 投票:0回答:1

我有一个很大的(1M)db结果集,我想为其每一行调用REST API。

API可以接受批处理请求,但是我不确定如何对rows生成器进行切片,以便每个任务处理一系列行,例如10。我宁愿不预先读取所有行并坚持使用生成器。

容纳my_function以在一个http请求中发送列表很容易,但是asyncio.gather呢?也许itertools之一可以提供帮助。

请参见下面的通用伪代码进行说明:

async def main(rows):
    async with aiohttp.ClientSession() as session:
        tasks = [my_function(row, session) for row in rows]
        return await asyncio.gather(*tasks)

rows = <generator of database rows>
results = asyncio.run(main(rows))

注意:results很小,基本上是每行的确认值。

旁注,

  • asyncio.gather()可以(有效)处理的任务数量是否有限制?
  • 当前gather()将所有请求/任务加载到内存中,消耗50GB(!)。如何读取和传递行和任务以减少内存使用量?这是asyncio.BoundedSemaphore()用于什么吗?
  • TCP连接限制为500,因为REST Web服务器可以接受那么多。如果信号量起作用,则该值应为什么,即设置信号量> TCPconnections limit是否有意义?

[aiohttpasyncio很棒,但很难遵循-我同意此post

asyncio一直在变化,因此请警惕旧的Stack Overflow答案。其中许多不是最新的最佳实践

编辑

我刚刚尝试使用asyncio.BoundedSemaphore(100),并且内存使用率大致相同(45GB)-不确定它比连接限制有任何好处

python-asyncio aiohttp
1个回答
1
投票

基于信号量的解决方案对大量任务的内存使用无济于事,因为您仍将提前创建所有协程和任务。它们将开始执行,只是其中的大多数会立即被挂起,等待信号量让它们进入。

相反,您可以创建固定数量的工作程序,并通过队列将它们提供给数据库行:

async def worker(queue, session):
    while True:
        row = await queue.get()
        await my_function(row, session)
        # Mark the item as processed, allowing queue.join() to keep
        # track of remaining work and know when everything is done.
        queue.task_done()

async def main(rows):
    N_WORKERS = 50
    queue = asyncio.Queue(N_WORKERS)
    async with aiohttp.ClientSession() as session:
        # create 50 workers and feed them tasks
        workers = [asyncio.create_task(worker(queue, session))
                   for _ in range(N_WORKERS)]
        # Feed the database rows to the workers. The fixed-capacity of the
        # queue ensures that we never hold all rows in the memory at the
        # same time. (When the queue reaches full capacity, this will block
        # until a worker dequeues an item.)
        async for row in rows:
            await queue.put(row)
        # Wait for all enqueued items to be processed.
        await queue.join()
    # The workers are now idly waiting for the next queue item and we
    # no longer need them.
    for worker in workers:
        worker.cancel()

请注意,rows应该是异步生成器。如果它是普通生成器,则可能会阻塞事件循环并成为瓶颈。如果您的数据库不支持异步接口,请参见this answer,以了解通过在专用线程中运行阻止生成器将其转换为异步的方法。

要批处理项目,您可以构建一个中间列表并进行分派。或者,您可以使用aiostream运算符附带的出色的aiostream库,该库可以执行以下操作:

chunks
© www.soinside.com 2019 - 2024. All rights reserved.