当使用 Python 的
apply_async()
中的 multiprocessing.Pool
方法时,有两种存储返回值的选项 - 保存 AsyncResult
对象并调用 .get()
,或者使用回调,即:
# Using AsnycResult
def process_data():
results = []
for i in range(n):
result = pool.apply_async(func, args)
results.append(result)
pool.close()
pool.join() # Not strictly necessary, since .get() will block anyway
data = [r.get() for r in results]
return data
或
# Using callback
data = []
def process_data()
for i in range(n):
pool.apply_async(func, args, callback=save_result)
pool.close()
pool.join()
def save_result(result):
data.append(result)
一种方式比另一种方式更“规范”吗?假设我们向池中提交许多(数千)个作业,这两种方法的优点/缺点是什么?
AsyncResult
方法消除了对全局变量的需求,但它需要第二个列表(AsyncResult
对象)——这是否有效地使所需的 RAM 加倍?
正如 Aaron 提到的,使用回调将允许您按照返回结果的顺序处理工作函数
func
的返回值,这不一定是任务提交的顺序。因此,data
的第ith元素通常不会是与为
args
的第ith元素创建的任务相对应的返回值。如果您希望在返回结果时对其进行处理,并且让
data
的元素处于任务提交顺序结果中,那么您可以将 data
预先分配为 n None
元素的列表,其中 n 是正在提交的任务数。然后,您必须将一个额外的结果索引参数传递给您的辅助函数,该函数将返回索引以及实际结果。通过这种方式,您的回调知道需要针对给定结果更新 data
的哪个元素。
如果有可能
func
提出例外怎么办?请注意,在第一个方法中,如果相应的提交任务以异常结束,对 r.get()
的调用将引发异常。因此,如果可能出现此类异常并且您想要处理它,则需要将对 r.get()
的调用包含在“try/catch”块中:
...
data = []
for r in results:
try:
data.append(r.get())
except Exception as e:
# How do you want to handle the exception?
...
要使用回调处理工作函数中可能出现的异常,您需要指定 error_callback 参数:
...
pool.apply_async(func, args, callback=save_result, error_callback=handle_exception)
...
def handle_exception(e):
# How do you want to handle the exception?
...