具有多个参数和kwarg的函数的Multiprocessing.pool

问题描述 投票:0回答:1

我想使用mutliprocessing.pool方法来并行化计算。问题是我想在计算中使用的函数有两个args和可选的kwargs,第一个参数是数据帧,第二个参数是str,任何kwargs都是字典。

我要使用的数据框和字典对于我要执行的所有计算都相同,只是第二个参数不断变化。因此,我希望能够使用map方法将其作为不同字符串的列表传递给具有df和dict的已打包函数。

from utils import *
import multiprocessing
from functools import partial



def sumifs(df, result_col, **kwargs):

    compare_cols = list(kwargs.keys())
    operators = {}
    for col in compare_cols:
        if type(kwargs[col]) == tuple:
            operators[col] = kwargs[col][0]
            kwargs[col] = list(kwargs[col][1])
        else:
            operators[col] = operator.eq
            kwargs[col] = list(kwargs[col])
    result = []
    cache = {}
    # Go through each value
    for i in range(len(kwargs[compare_cols[0]])):
        compare_values = [kwargs[col][i] for col in compare_cols]
        cache_key = ','.join([str(s) for s in compare_values])
        if (cache_key in cache):
            entry = cache[cache_key]
        else:
            df_copy = df.copy()
            for compare_col, compare_value in zip(compare_cols, compare_values):
                df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
            entry = df_copy[result_col].sum()
            cache[cache_key] = entry
        result.append(entry)
    return pd.Series(result)

if __name__ == '__main__':

    ca = read_in_table('Tab1')
    total_consumer_ids = len(ca)

    base = pd.DataFrame()
    base['ID'] = range(1, total_consumer_ids + 1)


    second_arg = ['A', 'B', 'C']
    keywords = {'Z': base['Consumer archetype ID']}

    max_number_processes = multiprocessing.cpu_count()
    with multiprocessing.Pool(processes=max_number_processes) as pool:
        results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)
    print(results)

但是,当我运行上面的代码时,出现以下错误:TypeError: sumifs() missing 1 required positional argument: 'result_col'。我如何为函数提供第一个arg和kwargs,同时将第二个参数作为str列表提供,以便可以并行计算?我已经在论坛上阅读了几个类似的问题,但是似乎没有一种解决方案可用于这种情况...

谢谢您,如果不清楚,我今天很了解多处理程序包!

python multiprocessing pool args kwargs
1个回答
0
投票

让我们看一下代码的两部分,sumifs函数声明:

def sumifs(df, result_col, **kwargs):

现在让我们看一下带有相关参数的对该函数的调用。

# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}

# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)

您收到错误TypeError: sumifs() missing 1 required positional argument: 'result_col',因为作为keywords发送给函数的kwargs词典不包含result_col参数。它只包含Z参数。

© www.soinside.com 2019 - 2024. All rights reserved.