给出:
小样本 pandas 数据框:
import pandas as pd
import numpy as np
import dask.dataframe as dd
df = pd.DataFrame({"usr": ["ip1", "ip7", "ip12", "ip4"], "colB": [1, 2, 3, 0], "ColA": [3, np.nan, 7, 1]}, dtype="float32").set_index("usr")
colB ColA
usr
ip1 1.0 3.0
ip7 2.0 NaN
ip12 3.0 7.0
ip4 0.0 1.0
我可以使用
sort_index
和 reindex
对此数据帧的索引和列进行排序,如下所示:
df_s = df.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) )) # sort index
df_s = df_s.reindex(columns=sorted(df_s.columns)) # sort columns
ColA colB
usr
ip1 3.0 1.0
ip4 1.0 0.0
ip7 NaN 2.0
ip12 7.0 3.0
问题:
我的真实数据集是一个大型数据框,我使用 Dask 从并行计算中受益。由于Dask中不存在
sort_index
,我尝试使用sort_values
,如下所示:
ddf = dd.from_pandas(df, npartitions=2)
ddf_s = ddf.map_partitions(lambda inp_ddf: inp_ddf.sort_values( ["usr"], ascending=True) ).compute()
但与我的
df_s
相比,我得到了完全不同的结果。索引和列都没有正确排序。
ColA colB
usr
ip1 3.0 1.0
ip4 1.0 0.0
ip7 NaN 2.0
ip12 7.0 3.0
如何在 Dask 中对索引和列进行排序?
干杯,
这实际上与 Dask 与 Pandas 无关。 您对索引和值进行不同的排序。
df.sort_index(key=lambda x: ( x.to_series().str[2:].astype(int) ))
对
inp_ddf.sort_values( ["usr"], ascending=True)
# that is equivalent to
df.sort_values( ["usr"], ascending=True)
要解决此问题,只需将两者与相同的
key
参数对齐即可。
ddf = dd.from_pandas(df, npartitions=2)
ddf_s = ddf.map_partitions(
lambda inp_ddf: inp_ddf.sort_values(
["usr"],
ascending=True,
key=lambda x: x.str[2:].astype(np.int64)
),
meta=df
).compute()
PS。关于
meta=df
。请阅读此处使用 lambda 函数添加列时的 Dask map_partitions 元数据