我想将字符串列表(即列名称)传递到转换函数中,该函数会生成一个新列,其中包含具有两个字段(“键”和“值”)的结构数组。其中key是列名,value是对应的列值
data = [
Row(name='Alice', age=30, city='New York'),
Row(name='Bob', age=25, city='Los Angeles'),
Row(name='Charlie', age=35, city='Chicago')
]
rdd = spark.sparkContext.parallelize(data)
我目前有:
df_with_column_of_structs = df.withColumn('array_of_structs',
expr(f"TRANSFORM(array(name, age), col -> named_struct('key', 'col', 'val', col))")
)
df_with_column_of_structs.printSchema()
df_with_column_of_structs.show(truncate=False)
// output
|-- name: string (nullable = true)
|-- age: long (nullable = true)
|-- city: string (nullable = true)
|-- array_of_structs: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- key: string (nullable = false)
| | |-- val: string (nullable = true)
+-------+---+-----------+---------------------------+
|name |age|city |array_of_structs |
+-------+---+-----------+---------------------------+
|Alice |30 |New York |[{col, Alice}, {col, 30}] |
|Bob |25 |Los Angeles|[{col, Bob}, {col, 25}] |
|Charlie|35 |Chicago |[{col, Charlie}, {col, 35}]|
+-------+---+-----------+---------------------------+
我想要的输出是
|name |age|city |array_of_structs |
+-------+---+-----------+---------------------------+
|Alice |30 |New York |[{name, Alice}, {age, 30}] |
|Bob |25 |Los Angeles|[{name, Bob}, {age, 25}] |
|Charlie|35 |Chicago |[{name, Charlie}, {age, 35}]|
+-------+---+-----------+---------------------------+
但是我无法取消引用列名来完成此操作。我尝试使用
$
和 {}
来取消引用,但都出现了语法错误
即
df_with_column_of_structs = df.withColumn('array_of_structs',
expr(f"TRANSFORM(array(name, age), col -> named_struct('key', {col}, 'val', col))")
)
//output
An error was encountered:
name 'col' is not defined
Traceback (most recent call last):
NameError: name 'col' is not defined
这并不是真正的输出,因为结构被定义为键值(键:值)并且您使用 komma 作为分隔符(键,值)。
也许这很有帮助。
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql import functions as f
data = [
Row(name='Alice', age=30, city='New York'),
Row(name='Bob', age=25, city='Los Angeles'),
Row(name='Charlie', age=35, city='Chicago')
]
schema=StructType([
StructField(name="name", dataType=StringType()),
StructField(name="age", dataType=IntegerType()),
StructField(name="city", dataType=StringType())
])
df = (
spark.createDataFrame(data=data, schema=schema)
.withColumn("nested_struct", f.array(f.struct("name", "age")))
)