为什么我的消费者在队列中与生产者分开工作?

问题描述 投票:0回答:1

我的目标是异步调用API,并将结果(每次调用)写入文件(1个调用-> 1个文件)。我认为实现此目的的一种方法是使用队列。我的意图是让生产者准备好响应后,立即将其推送到队列中,然后让消费者在文件可用时立即处理(写入)文件。

困惑:当我运行代码时,查看打印语句,我发现首先完成了生产者,然后消费者开始使用我的输出。这似乎与我的意图不相符,即消费者在任务可用后立即进行工作。我还考虑过使用多个过程(1个用于消费者,1个用于生产者),但是我不确定是否要以这种方式使事情复杂化。

我创建了当前状态的图示:

import aiohttp
import asyncio


async def get_data(session, day):
    async with session.post(url=SOME_URL, json=SOME_FORMAT, headers=HEADERS) as response:
        return await response.text()


async def producer(q, day):
    async with aiohttp.ClientSession() as session:
        result = await get_data(session, day)
        await q.put(result)


async def consumer(q):
    while True:
        outcome = await q.get()
        print("Consumed:", outcome) # assuming I write files here
        q.task_done()


async def main():
    queue = asyncio.Queue()
    days = [day for day in range(20)]  # Here I normally use calendar dates instead of range
    producers = [asyncio.create_task(producer(queue, day) for day in days]
    consumer = asyncio.create_task(consumer(queue)
    await asyncio.gather(*producers)
    await queue.join()
    consumer.cancel()

    if __name__ == '__main__':
        asyncio.run(main())

我在正确的轨道上吗?

python python-3.x async-await python-asyncio aiohttp
1个回答
2
投票

您的代码通常很好(除了一些语法错误,我猜这是错误的复制粘贴的结果)。实际上,所有生产者都是在消费者开始工作之前创建的,因为他们没有什么可等待的。但是,如果生产者需要做真正的工作,您将看到他们只有在消费者开始工作之后才完成工作,然后事情才能完成。 这里是代码的编辑版本,加上显示出确实有效的输出。

import aiohttp import asyncio async def get_data(session, day): print(f"get data, day {day}") async with session.get(url="https://www.google.com") as response: res = await response.text() print(f"got data, day {day}") return res[:100] async def producer(q, day): async with aiohttp.ClientSession() as session: result = await get_data(session, day) await q.put(result) async def consumer(q): print("Consumer stated") while True: outcome = await q.get() print("Consumed:", outcome) # assuming I write files here asyncio.sleep(1) q.task_done() async def main(): queue = asyncio.Queue() days = [day for day in range(20)] # Here I normally use calendar dates instead of range producers = [asyncio.create_task(producer(queue, day)) for day in days] print("main: producer tasks created") consumer_task = asyncio.create_task(consumer(queue)) print("main: consumer task created") await asyncio.gather(*producers) print("main: gathered producers") await queue.join() consumer_task.cancel() if __name__ == '__main__': asyncio.run(main())

输出:

main: producer tasks created main: consumer task created get data, day 0 get data, day 1 get data, day 2 get data, day 3 ... get data, day 19 Consumer stated got data, day 1 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content queue_so.py:21: RuntimeWarning: coroutine 'sleep' was never awaited asyncio.sleep(1) RuntimeWarning: Enable tracemalloc to get the object allocation traceback got data, day 10 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 19 got data, day 11 got data, day 14 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 15 got data, day 17 got data, day 6 got data, day 18 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 7 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 8 got data, day 9 got data, day 2 got data, day 12 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 0 got data, day 5 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 4 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 3 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 13 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content got data, day 16 Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content main: gathered producers

© www.soinside.com 2019 - 2024. All rights reserved.