火花版本:3.2
我定义了 pandas_udf
def calculate_shap(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for X in iterator:
yield pd.DataFrame(
explainer.shap_values(np.array(X), check_additivity=False)[0],
columns=columns_for_shap_calculation,
)
return_schema = StructType()
for feature in columns_for_shap_calculation:
return_schema = return_schema.add(StructField(feature, FloatType()))
shap_values = df.mapInPandas(calculate_shap, schema=return_schema)
在这种情况下,我如何确保当我们将 df 传递给mapInPandas时,迭代器对象将被分割成我想要设置的任何分区?
例如,如果我有 pyspark 数据帧,其中包含 100 万行,且 ID 列的值为 1,2,3,4 和
如果是这种情况,我的迭代器应该按 ID 进行分区,然后按
[200K,500K,100K,200K]
进行分割并执行 pandas_udf。
我有一些使用的想法
df = df.repartition("ID")
,然后传递给df.mapInPandas
,但是这会改变我的分区数量,但不会改变迭代器对象吗??
或者我可以设置
df = df.groupBy("ID")
,然后传递给df.mapInPandas
,但是我怎样才能使用groupBy来完成这项工作呢?
有没有更简单的方法来操作Iterator对象?
重新分区数据框。在应用
repartition("ID")
之前,使用 mapInPandas
根据“ID”列对数据进行分区。
这会物理地重新排列数据,创建具有相同“ID”值的行的分区。 它直接影响
mapInPandas
内的迭代器对象。
df = df.repartition("ID")
shap_values = df.mapInPandas(calculate_shap, schema=return_schema)
不要在mapInPandas之前使用groupBy。 groupBy 创建一个 GroupedData 对象,而不是与 mapInPandas 兼容的 DataFrame。
#如果分组很重要,请编写一个函数 - 将分组和mapInPandas结合起来:
def grouped_calculate_shap(df):
for id_value, group_df in df.groupBy("ID"):
yield calculate_shap(group_df.toPandas())
shap_values = df.mapInPandas(grouped_calculate_shap, schema=return_schema)
希望这对您有帮助!