ThreadPoolExecutor asyncio 类似物?

问题描述 投票:0回答:1

我曾经在我的项目中使用

concurrent.futures.ThreadPoolExecutor
,用它在单独的线程中运行我的同步函数。

现在我主要使用

asyncio
的异步代码,但我有一个问题,我找不到使用
asyncio
的解决方案。

假设我有一个异步函数

async def do_job(a, b, c, d):
     if a < b:
         raise CustomException("A is smaller than b")
     else:
         return True, "Work done"

(只是一个例子,实际上功能非常广泛,在某些情况下引发自定义异常是其中的强制部分)

假设我需要运行给定函数的100个实例,并且限制同时运行的N个实例。 我还想在执行时接收来自线程的结果,而无需等待所有任务完成(以及运行给定任务的参数),并且还处理错误添加新实例在某些情况下,任务池

ThreadPoolExecutor

让我实现了这个目标(不确定是否要向池中添加新任务,因为当时我不需要它),但我在

asyncio
中找不到任何满足我需求的东西。
我使用 ThreadPoolExecutor 的示例

with concurrent.futures.ThreadPoolExecutor(max_threads) as executor: futures = {executor.submit(exec_f, a, b): a for a in param_list} for future in concurrent.futures.as_completed(futures): param_a = futures[future] try: result = future.result() # Process result except Exception as ex: # Process exception

请告诉我,也许我遗漏了一些东西,这可以用
asyncio

来实现,或者我应该放弃这个想法并将

asyncio
concurrent.futures.ThreadPoolExecutor
结合起来(似乎这不是很正确)。
    

python-3.x asynchronous concurrency python-asyncio concurrent.futures
1个回答
0
投票
max_threads

在 asyncio 中控制

Semaphore
。只需使用
create_task()
即可添加新任务。
例如:

import asyncio from asyncio import Semaphore async def do_job(a, b): if a < b: raise Exception("A is smaller than b") else: return True, "Work done" async def task_wrapper(semaphore: Semaphore, *args): await semaphore.acquire() try: result = await do_job(*args) except Exception as ex: pass finally: semaphore.release() async def main(): semaphore = Semaphore(max_threads) tasks = [] for a in param_list: task = asyncio.create_task(task_wrapper(semaphore, a)) tasks.append(task) await asyncio.gather(*tasks) if __name__ == "__main__": asyncio.run(main())

© www.soinside.com 2019 - 2024. All rights reserved.