我有一辆Spark DF in_df
有300多列,其中一列为字符串,其余为双数。我需要在上面运行GroupedMap Pandas UDF,并在运行前定义输出的schema。在输出的列数应该相同但类型不同的情况下,如何定义该模式?我所能找到的几个Pandas UDF的例子中,通常只是使用了以下模式 in
作为输出模式。
我见过的一种方法是用 withColumn
和 cast()
关于 in_df
. 这是最好的做法吗?如果我希望我的输出是一个完全不同的形状与 in_df
但列数太多,无法手工编码?我一直找不到好的资源。
Uisng pyspark.sql.types.StructType.fromJson()
你可以从json中动态地构建模式。
根据你的要求,我改变了数据类型为 "col_e",你可以根据你的使用情况将DataTypes改为一列或多列。
df = spark.read.csv('test.csv',header=True,inferSchema=True)
fields = []
for f in json.loads(df.schema.json())["fields"]:
if f["name"] == "col_e":
fields.append(StructField("col_e", StringType(), True))
else:
fields.append(StructField.fromJson(f))
schema = StructType(fields)
@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def many_cols_data(pdf):
pdf['col_e'] = "test"
return pdf
df.groupBy(
'col_a'
).apply(
many_cols_data
).show()
输入文件test.csv
col_a,col_b,col_c,col_d,col_e
a,2,3,4,5
b,2,3,4,5
c,2,3,4,5
结果
+-----+-----+-----+-----+-----+
|col_a|col_b|col_c|col_d|col_e|
+-----+-----+-----+-----+-----+
| c| 2| 3| 4| test|
| b| 2| 3| 4| test|
| a| 2| 3| 4| test|
+-----+-----+-----+-----+-----+