我曾经在我的项目中使用
concurrent.futures.ThreadPoolExecutor
,用它在单独的线程中运行我的同步函数。
现在我主要使用
asyncio
的异步代码,但我有一个问题,我找不到使用asyncio
的解决方案。
假设我有一个异步函数
async def do_job(a, b, c, d):
if a < b:
raise CustomException("A is smaller than b")
else:
return True, "Work done"
(只是一个例子,实际上功能非常广泛,在某些情况下引发自定义异常是其中的强制部分)
假设我需要运行给定函数的100个实例,并且限制同时运行的N个实例。 我还想在执行时接收来自线程的结果,而无需等待所有任务完成(以及运行给定任务的参数),并且还处理错误和添加新实例在某些情况下,任务池。
ThreadPoolExecutor
让我实现了这个目标(不确定是否要向池中添加新任务,因为当时我不需要它),但我在
asyncio
中找不到任何满足我需求的东西。我使用 ThreadPoolExecutor 的示例
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
futures = {executor.submit(exec_f, a, b): a for a in param_list}
for future in concurrent.futures.as_completed(futures):
param_a = futures[future]
try:
result = future.result()
# Process result
except Exception as ex:
# Process exception
请告诉我,也许我遗漏了一些东西,这可以用
asyncio
来实现,或者我应该放弃这个想法并将
asyncio
与concurrent.futures.ThreadPoolExecutor
结合起来(似乎这不是很正确)。 max_threads
在 asyncio 中控制
Semaphore
。只需使用 create_task()
即可添加新任务。例如:
import asyncio
from asyncio import Semaphore
async def do_job(a, b):
if a < b:
raise Exception("A is smaller than b")
else:
return True, "Work done"
async def task_wrapper(semaphore: Semaphore, *args):
await semaphore.acquire()
try:
result = await do_job(*args)
except Exception as ex:
pass
finally:
semaphore.release()
async def main():
semaphore = Semaphore(max_threads)
tasks = []
for a in param_list:
task = asyncio.create_task(task_wrapper(semaphore, a))
tasks.append(task)
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())