如何使用 pandas_udf 在 pyspark 中对分组/分区数据帧进行迭代

问题描述 投票:0回答:1

火花版本:3.2

我定义了 pandas_udf

def calculate_shap(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for X in iterator:
        yield pd.DataFrame(
            explainer.shap_values(np.array(X), check_additivity=False)[0],
            columns=columns_for_shap_calculation,
        )

return_schema = StructType()
for feature in columns_for_shap_calculation:
    return_schema = return_schema.add(StructField(feature, FloatType()))

shap_values = df.mapInPandas(calculate_shap, schema=return_schema)

在这种情况下,我如何确保当我们将 df 传递给mapInPandas时,迭代器对象将被分割成我想要设置的任何分区?

例如,如果我有 pyspark 数据帧,其中包含 100 万行,且 ID 列的值为 1,2,3,4 和

  1. 200K 行的值为 1
  2. 500K 行的值为 2
  3. 100K 行的值为 3
  4. 200K 行的值为 4

如果是这种情况,我的迭代器应该按 ID 进行分区,然后按

[200K,500K,100K,200K]
进行分割并执行 pandas_udf。

我有一些使用的想法

df = df.repartition("ID")
,然后传递给
df.mapInPandas
,但是这会改变我的分区数量,但不会改变迭代器对象吗??

或者我可以设置

df = df.groupBy("ID")
,然后传递给
df.mapInPandas
,但是我怎样才能使用groupBy来完成这项工作呢?

有没有更简单的方法来操作Iterator对象

python apache-spark pyspark user-defined-functions pandas-udf
1个回答
0
投票

重新分区数据框。在应用

repartition("ID")
之前,使用
mapInPandas
根据“ID”列对数据进行分区。

这会物理地重新排列数据,创建具有相同“ID”值的行的分区。 它直接影响

mapInPandas
内的迭代器对象。

df = df.repartition("ID")
shap_values = df.mapInPandas(calculate_shap, schema=return_schema)

不要在mapInPandas之前使用groupBy。 groupBy 创建一个 GroupedData 对象,而不是与 mapInPandas 兼容的 DataFrame。

#如果分组很重要,请编写一个函数 - 将分组和mapInPandas结合起来:

def grouped_calculate_shap(df):
    for id_value, group_df in df.groupBy("ID"):
        yield calculate_shap(group_df.toPandas())

shap_values = df.mapInPandas(grouped_calculate_shap, schema=return_schema)

希望这对您有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.