如何为Spark DataFrame定义一个多列的模式?

问题描述 投票:1回答:1

我有一辆Spark DF in_df 有300多列,其中一列为字符串,其余为双数。我需要在上面运行GroupedMap Pandas UDF,并在运行前定义输出的schema。在输出的列数应该相同但类型不同的情况下,如何定义该模式?我所能找到的几个Pandas UDF的例子中,通常只是使用了以下模式 in 作为输出模式。

我见过的一种方法是用 withColumncast() 关于 in_df. 这是最好的做法吗?如果我希望我的输出是一个完全不同的形状与 in_df 但列数太多,无法手工编码?我一直找不到好的资源。

python pandas apache-spark pyspark pandas-groupby
1个回答
0
投票

Uisng pyspark.sql.types.StructType.fromJson() 你可以从json中动态地构建模式。

根据你的要求,我改变了数据类型为 "col_e",你可以根据你的使用情况将DataTypes改为一列或多列。

df = spark.read.csv('test.csv',header=True,inferSchema=True)
fields = []
for f in json.loads(df.schema.json())["fields"]:
    if f["name"] == "col_e":
        fields.append(StructField("col_e", StringType(), True))
    else:
        fields.append(StructField.fromJson(f))

schema = StructType(fields)

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def many_cols_data(pdf):
    pdf['col_e'] = "test"
    return pdf

df.groupBy(
    'col_a'
).apply(
    many_cols_data
).show()

输入文件test.csv

col_a,col_b,col_c,col_d,col_e
a,2,3,4,5
b,2,3,4,5
c,2,3,4,5

结果

+-----+-----+-----+-----+-----+
|col_a|col_b|col_c|col_d|col_e|
+-----+-----+-----+-----+-----+
|    c|    2|    3|    4| test|
|    b|    2|    3|    4| test|
|    a|    2|    3|    4| test|
+-----+-----+-----+-----+-----+
© www.soinside.com 2019 - 2024. All rights reserved.