我有这个数据框:
root
|-- Id: string (nullable = true)
|-- Name: string (nullable = true)
示例数据:
+------+------+
| Id | Name |
+------+------+
| 1 | 'A' |
+------+------+
| 2 | 'B' |
+------+------+
我想添加一个 Items 列,它是一个包含如下数据的嵌套结构:
{Id:{"Id":Id,"Name":Name}}
最终的架构应该如下所示:
root
|-- Id: string (nullable = true)
|-- Name: string (nullable = true)
|-- Items: struct (nullable = true)
| |-- 1: struct (nullable = true)
| | |-- Id: string (nullable = true)
| | |-- Name: string (nullable = true)
| |-- 2: struct (nullable = true)
| | |-- Id: string (nullable = true)
| | |-- Name: string (nullable = true)
最终数据应如下所示:
+------+------+-----------------------------+
| Id | Name | Items |
+------+------+-----------------------------+
| 1 | 'A' | {1:{"Id":1, "Name": "A"}} |
+------+------+-----------------------------+
| 2 | 'B' | {2:{"Id":2, "Name": "B"}} |
+------+------+-----------------------------+
我尝试过这个表达式:
transform_expr_items = """struct(Id, struct(Id as Id,
Name as Name
))"""
df_tmp = df_test.withColumn("Items", expr(transform_expr_items))
但得到的架构如下所示:
root
|-- Id: string (nullable = true)
|-- Name: string (nullable = true)
|-- Items: struct (nullable = true)
| |-- Id: string (nullable = true)
| |-- col2: struct (nullable = false)
| | |-- Id: string (nullable = true)
| | |-- Name: string (nullable = true)
谢谢您的帮助!
您可以尝试预先定义模式并在正在解析的数据上使用该模式,如下所示:
# Create a DataFrame
data = [("1", "A"), ("2", "B")]
df = spark.createDataFrame(data, ["Id", "Name"])
# Add a new Items Column (Items=Map[Id,Name])
df = df.withColumn("Items", F.create_map(F.col("Id"), F.struct(F.col("Id"), F.col("Name"))))
# Define the schema beforehand, then apply to data that is read
new_schema = StructType([
StructField("Id", StringType(), True),
StructField("Name", StringType(), True),
StructField("Items", StructType([
StructField("1", StructType([
StructField("Id", StringType(), True),
StructField("Name", StringType(), True)
]), True),
StructField("2", StructType([
StructField("Id", StringType(), True),
StructField("Name", StringType(), True)
]), True)
]), True)
])
# Apply the new schema, display the transformed data & schema
df_new = spark.createDataFrame(df.rdd, new_schema)
df_new.show(df_new.count(), False)
df_new.printSchema()