Python, Dask - 使用来自另一个模块的函数并映射到Dask Dataframe。

问题描述 投票:0回答:1

我试图使用map_partitions在dask数据框架上应用一堆函数。当函数在本地定义时,它可以工作,例如。

#assume a data frame df1

def upper(x):
    return x.str.upper()

def process(df,info): 
    for mapper,col in info['process']:
        df[col] = df[col].map_partitions(mapper, meta=df[col])
    return df

info = {'process':[(upper, 'column_name')]}
df = process(df1, info)

df.head()

但是当我们把代码分割成模块时,它就不能工作了。

helper.py

def upper(x):
    return x.str.upper()

def upper_lambda():
    return lambda x: x.str.upper()

main.py

import helper

#assume a data frame df1

def process(df,info): 
    for mapper,col in info['process']:
        df[col] = df[col].map_partitions(mapper, meta=df[col])
    return df

info = {'process':[(getattr(helper,'upper'), 'column_name')]}
#Tried with the lambda too.. dosent seem to work 
#info = {'process':[(helper.upper(), 'column_name')]}

df = process(df1, info)

df.head()

它只是抛回KilledWorker:("('assign-read-parquet-head-1-5-assign-77bd7b855e5e8eec82312c65361fc7c5', 0)"。

python python-3.x pandas dask dask-dataframe
1个回答
1
投票

Dask当然支持使用其他模块的函数。 然而,这些模块应该存在于你使用的所有机器上。

对于小文件,如您的 helper.py 文件,你可能想看看 Client.upload_file 来帮助你移动它。

© www.soinside.com 2019 - 2024. All rights reserved.