我有一个需要处理的项目列表,我希望能够并行处理它们以提高效率。但在处理一项的过程中,我可能会发现更多的项目需要添加到要处理的列表中。
我查看了 multiprocessing 和 concurrent 库,但我找不到此类队列的功能可以在运行时或在传递到池之后进行修改。有满足我愿望的解决方案吗?
这里有一些代码演示了我想要的。
i = 0
jobs_to_be_processed = [f'job{(i:=i+1)}' for _ in range(5)]
def process_job(job):
if int(job[-1]) % 3 == 0:
jobs_to_be_processed.append(f'new job{(i:=i+1)}')
# do process job ...
pass
# Add jobs to a pool that allows `jobs_to_be_processed`
# to have jobs added while processing
pool = AsyncJobPool(jobs_to_be_processed)
pool.start()
pool.join()
IIUC 您可以在放置要处理的项目的位置使用
asyncio.Queue
,例如:
import asyncio
async def worker(queue: asyncio.Queue):
while True:
item = await queue.get()
if item == 'spawn more jobs':
print('Spawning more jobs!')
queue.put_nowait('other1')
queue.put_nowait('other2')
else:
await asyncio.sleep(1)
print(f'Processed job item: {item}')
queue.task_done()
async def main():
q = asyncio.Queue()
# we have pool of 2 workers that work concurrently:
workers = [asyncio.create_task(worker(q)) for i in range(2)]
# initially we have 4 job items (one item spawns 2 more jobs):
for job in ['job1', 'job2', 'spawn more jobs', 'job3']:
q.put_nowait(job)
await q.join()
for w in workers:
w.cancel()
asyncio.run(main())
打印:
Processed job item: job1
Spawning more jobs!
Processed job item: job2
Processed job item: job3
Processed job item: other1
Processed job item: other2