问题: 我想使用 aiohttp 生产者下载大量文件,而 aiofiles 消费者从池中提取文件。
我见过这样的实现
参考。 https://gist.github.com/showa-yojyo/4ed200d4c41f496a45a7af2612912df3
async def produce(queue, n):
for x in range(1, n + 1):
# produce an item
print(f'producing {x}/{n}')
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# put the item in the queue
await queue.put(x)
async def consume(queue):
while True:
# wait for an item from the producer
item = await queue.get()
# process the item
print(f'consuming {item}...')
# simulate i/o operation using sleep
await asyncio.sleep(random.random())
# Notify the queue that the item has been processed
queue.task_done()
async def run(n):
queue = asyncio.Queue()
# schedule consumers
consumers = []
for _ in range(3):
consumer = asyncio.create_task(consume(queue))
consumers.append(consumer)
# run the producer and wait for completion
await produce(queue, n)
# wait until the consumer has processed all items
await queue.join()
# the consumers are still awaiting for an item, cancel them
for consumer in consumers:
consumer.cancel()
# wait until all worker tasks are cancelled
await asyncio.gather(*consumers, return_exceptions=True)
问题是,在这个示例中,生产者使用正常的 for 循环来生成模拟数据,但我需要使用 aiohttp 并且 async with aiohttp.ClientSession()
应该在 run()
中但不会被创建,因为任务不是吗?同时,消费者不是 aiohttp 的下游任务,而是并行运行。 (我想分别控制生产者和消费者的数量)这就是我卡住的地方。
然后这里有个有趣的设计,制作人是主要的切入点。作业首先被分成块。然后它遍历块,在每个循环中,创建消费者任务。但是我觉得这不是真正的生产者消费者模型。我在这里寻求一些启发。谢谢。
async def producer(self, times=1, parallel_items=1):
jobs = []
# Creating async queue
queue = asyncio.Queue()
chunk = times // parallel_items
print(f"times={times}, parallel_items={parallel_items}, result={times//parallel_items}")
for _ in range(chunk):
queue.put_nowait([y for y in range(chunk)])
print(queue.qsize())
# The queue size is equal to the number of chunks to execute
for _ in range(queue.qsize()):
jobs.append(asyncio.create_task(self.consumer(queue=queue)))
await queue.join()
await asyncio.gather(*jobs, return_exceptions=True)
# Telling the workers to cancel as they already returned a result (on join phase)
for job in jobs:
job.cancel()
print("<Coroutine> Numbers of requests: {}".format(self.executed))
参考。 https://gist.github.com/FilippoLeone850/aeee81006cd1cba9728758811c81512f