如何使用 aiohttp 和 aiofiles 实现生产者-消费者模型

问题描述 投票:0回答:0

问题: 我想使用 aiohttp 生产者下载大量文件,而 aiofiles 消费者从池中提取文件。

我见过这样的实现

参考。 https://gist.github.com/showa-yojyo/4ed200d4c41f496a45a7af2612912df3

    async def produce(queue, n):
        for x in range(1, n + 1):
            # produce an item
            print(f'producing {x}/{n}')
            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())
    
            # put the item in the queue
            await queue.put(x)
    
    
    async def consume(queue):
        while True:
            # wait for an item from the producer
            item = await queue.get()
    
            # process the item
            print(f'consuming {item}...')
            # simulate i/o operation using sleep
            await asyncio.sleep(random.random())
    
            # Notify the queue that the item has been processed
            queue.task_done()
    
    
    async def run(n):
        queue = asyncio.Queue()
    
        # schedule consumers
        consumers = []
        for _ in range(3):
            consumer = asyncio.create_task(consume(queue))
            consumers.append(consumer)
    
        # run the producer and wait for completion
        await produce(queue, n)
        # wait until the consumer has processed all items
        await queue.join()
    
        # the consumers are still awaiting for an item, cancel them
        for consumer in consumers:
            consumer.cancel()
    
        # wait until all worker tasks are cancelled
        await asyncio.gather(*consumers, return_exceptions=True)

问题是,在这个示例中,生产者使用正常的 for 循环来生成模拟数据,但我需要使用 aiohttp 并且 async with aiohttp.ClientSession() 应该在 run() 中但不会被创建,因为任务不是吗?同时,消费者不是 aiohttp 的下游任务,而是并行运行。 (我想分别控制生产者和消费者的数量)这就是我卡住的地方。

然后这里有个有趣的设计,制作人是主要的切入点。作业首先被分成块。然后它遍历块,在每个循环中,创建消费者任务。但是我觉得这不是真正的生产者消费者模型。我在这里寻求一些启发。谢谢。

async def producer(self, times=1, parallel_items=1):
        jobs = []
        # Creating async queue
        queue = asyncio.Queue()
        chunk = times // parallel_items
        print(f"times={times}, parallel_items={parallel_items}, result={times//parallel_items}")
        for _ in range(chunk):
            queue.put_nowait([y for y in range(chunk)])
        print(queue.qsize())
        # The queue size is equal to the number of chunks to execute
        for _ in range(queue.qsize()):
            jobs.append(asyncio.create_task(self.consumer(queue=queue)))

        await queue.join()
        await asyncio.gather(*jobs, return_exceptions=True)
        # Telling the workers to cancel as they already returned a result (on join phase)
        for job in jobs:
            job.cancel()

        print("<Coroutine> Numbers of requests: {}".format(self.executed))

参考。 https://gist.github.com/FilippoLeone850/aeee81006cd1cba9728758811c81512f

python python-asyncio aiohttp producer-consumer python-aiofiles
© www.soinside.com 2019 - 2024. All rights reserved.