我有一个数据集,我想使用多个Pyspark SQLGrouped MapUDF在AWS EMR的临时集群中运行的较大ETL过程的不同阶段映射。分组地图API要求在应用之前先将Pyspark数据框分组,但是我实际上不需要分组密钥。
目前,我正在使用任意分组,该分组有效,但结果是:
不必要的随机播放。
每个作业中任意分组的无效代码。
我的理想解决方案是应用矢量化的Pandas UDF,而无需进行任意分组,但是如果我可以保存任意分组,至少可以消除混洗。
编辑:
这是我的代码的样子。我最初使用的是任意分组,但目前基于@pault的以下注释正在尝试spark_partition_id()
。
@pandas_udf(b_schema, PandasUDFType.GROUPED_MAP)
def transform(a_partition):
b = a_partition.drop("pid", axis=1)
# Some other transform stuff
return b
(sql
.read.parquet(a_path)
.withColumn("pid", spark_partition_id())
.groupBy("pid")
.apply(transform)
.write.parquet(b_path))
使用spark_partition_id()
似乎仍然会造成随机播放。我得到以下DAG:
要支持大致等效的逻辑(功能(pandas.core.frame.DataFrame) -> pandas.core.frame.DataFrame
),您必须切换到Spark 3.0.0并使用MAP_ITER
转换。
在最新的预览版本(3.0.0-preview2)中,您需要一个UDF:
@pandas_udf(b_schema, PandasUDFType.MAP_ITER)
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
...
yield b
df.mapInPandas(transform)
以及在即将发布的3.0.0版本(SPARK-28264)中只是一个简单的功能:
def transform(dfs):
for df in dfs:
b = df.drop("pid", axis=1)
# Some other transform stuff
...
yield b
df.mapInPandas(transform, b_schema)
在2.x上可能的解决方法是使用普通SCALAR
UDF,将结果的每一行序列化为JSON,然后在另一侧反序列化,即,
import json
from pyspark.sql.functions import from_json
@pandas_udf("string", PandasUDFType.SCALAR)
def transform(col1, col2):
b = pd.DataFrame({"x": col1, "y": col2})
...
return b.apply(lambda x: json.dumps(dict(zip(df.columns, x))), axis=1)
(df
.withColumn("json_result", transform("col1", "col2"))
.withColumn("a_struct", from_json("json_result", b_schema)))