Dask DataFrame:使用 map_partitions 从旧列创建 2 个新列并分配和自定义函数?

问题描述 投票:0回答:0

我有一个 3 mio 的 csv。行。 包含测量数据的列不是纯粹的浮点数。 它包含“<" and ">”。我想创建两个新列,一个包含“<", ">”或“=”,另一个包含浮点数。 我可以用 pandas 做到这一点,但在一台像样的电脑上需要超过 2 分钟。 因此,我想通过使用 dask 数据帧进行并行处理来加快速度。

我在处理函数中从数据帧中获取值并将其转换为字符串以便事后检查时遇到问题。

我还想只有一个过程函数,因为两个新列都需要对旧列进行相同的过滤。

df_p = pd.read_csv(BytesIO(mw_csv_b.read()), sep = ";", index_col=["sl_nr"], \
encoding='utf-8-sig', dtype = {"messergebnis_c":str ,"messergebnis_hinweis":str }, \
parse_dates = ["datum_pn", "aktual_dat", "erstell_dat"])

df_qual = dd.from_pandas(df_p, npartitions = 6)

dask 将 df_qual["messergebnis_c"] 显示为 'object' dtype,这就是我在以下函数中首先将其转换为字符串的原因。

def is_float(element: str) -> bool:
    try:
        float(element)
        return True
    except ValueError:
        return False

def process_g(g):
    if is_float(g):
        return "="
    elif str(g)[0] == "<":
        return "<"
    elif str(g)[0] == ">":
        return ">"

def process_m(m):
    if is_float(m):
        return float(m)
    elif str(m)[0] == "<":
        return float(str(m)[1:])
    elif str(m)[0] == ">":
        return float(str(m)[1:])

如何正确使用 map_partitions 并分配两个新列?

ddf = df_qual.map_partitions(lambda df: df.assign(grenze = lambda r: \
process_g(r["messergebnis_c"]), messergebnis_num = lambda r: \
process_m(r["messergebnis_c"]) ))

ddf.compute()

有没有更好/更快的方法(例如,只有一个函数可以重新调整 DataFrame 或两个 Series,然后可以将其分配给 dask DataFrame)?
我有点困惑,因为它对我来说很新。帮助表示赞赏。

python lambda dask assign dask-dataframe
© www.soinside.com 2019 - 2024. All rights reserved.