AsyncResult 与从 apply_async() 获取许多结果的回调

问题描述 投票:0回答:1

当使用 Python 的

apply_async()
中的
multiprocessing.Pool
方法时,有两种存储返回值的选项 - 保存
AsyncResult
对象并调用
.get()
,或者使用回调,即:

# Using AsnycResult
def process_data():
    results = []
    for i in range(n):
        result = pool.apply_async(func, args)
        results.append(result)

    pool.close()
    pool.join() # Not strictly necessary, since .get() will block anyway
    data = [r.get() for r in results]
    return data

# Using callback
data = []

def process_data()
    for i in range(n):
        pool.apply_async(func, args, callback=save_result)

    pool.close()
    pool.join()

def save_result(result):
    data.append(result)

一种方式比另一种方式更“规范”吗?假设我们向池中提交许多(数千)个作业,这两种方法的优点/缺点是什么?

AsyncResult
方法消除了对全局变量的需求,但它需要第二个列表(
AsyncResult
对象)——这是否有效地使所需的 RAM 加倍?

python callback multiprocessing python-multiprocessing
1个回答
0
投票

正如 Aaron 提到的,使用回调将允许您按照返回结果的顺序处理工作函数

func
的返回值,这不一定是任务提交的顺序。因此,data的第i
th
元素通常不会是与为args的第i
th
元素创建的任务相对应的返回值。如果您希望在返回结果时对其进行处理,并且让
data
的元素处于任务提交顺序结果中,那么您可以将
data
预先分配为 n
None
元素的列表,其中 n 是正在提交的任务数。然后,您必须将一个额外的结果索引参数传递给您的辅助函数,该函数将返回索引以及实际结果。通过这种方式,您的回调知道需要针对给定结果更新
data
的哪个元素。

如果有可能

func
提出例外怎么办?请注意,在第一个方法中,如果相应的提交任务以异常结束,对
r.get()
的调用将引发异常。因此,如果可能出现此类异常并且您想要处理它,则需要将对
r.get()
的调用包含在“try/catch”块中:

    ...
    data = []
    for r in results:
        try:
            data.append(r.get())
        except Exception as e:
            # How do you want to handle the exception?
            ...

要使用回调处理工作函数中可能出现的异常,您需要指定 error_callback 参数:

        ...
        pool.apply_async(func, args, callback=save_result, error_callback=handle_exception)
        ...

def handle_exception(e):
    # How do you want to handle the exception?
    ...
© www.soinside.com 2019 - 2024. All rights reserved.