如何使用 pyspark 更新结构体嵌套列中的值

问题描述 投票:0回答:3

我尝试做非常简单的事情 - 更新嵌套列的值;但是,我不知道如何做

环境:

  1. 阿帕奇火花2.4.5
  2. Databricks 6.4
  3. Python 3.7
dataDF = [
  (('Jon','','Smith'),'1580-01-06','M',3000)
]


schema = StructType([
        StructField('name', StructType([
             StructField('firstname', StringType(), True),
             StructField('middlename', StringType(), True),
             StructField('lastname', StringType(), True)
             ])),
         StructField('dob', StringType(), True),
         StructField('gender', StringType(), True),
         StructField('gender', IntegerType(), True)
         ])


df = spark.createDataFrame(data = dataDF, schema = schema)
df = df.withColumn("name.firstname", lit('John'))
df.printSchema()
df.show()

#Results
#I get a new column instead of update

root
 |-- name: struct (nullable = true)
 |    |-- firstname: string (nullable = true)
 |    |-- middlename: string (nullable = true)
 |    |-- lastname: string (nullable = true)
 |-- dob: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- gender: integer (nullable = true)
 |-- name.firstname: string (nullable = false)

+--------------+----------+------+------+--------------+
|          name|       dob|gender|gender|name.firstname|
+--------------+----------+------+------+--------------+
|[Jon, , Smith]|1580-01-06|     M|  3000|          John|
+--------------+----------+------+------+--------------+
python apache-spark pyspark apache-spark-sql
3个回答
12
投票

对于 Spark 3.1+,您可以在结构列上使用 withField

按名称添加/替换

StructType
中的字段的表达式。

import pyspark.sql.functions as F

df1 = df.withColumn("name", F.col("name").withField("firstname", F.lit("John")))

7
投票

需要与专栏争论一下,如下所示:

import pyspark.sql.functions as F

df2 = df.select('*', 'name.*') \
        .withColumn('firstname', F.lit('newname')) \
        .withColumn('name', F.struct(*[F.col(col) for col in df.select('name.*').columns])) \
        .drop(*df.select('name.*').columns)

df2.show()
+------------------+----------+------+------+
|              name|       dob|gender|gender|
+------------------+----------+------+------+
|[newname, , Smith]|1580-01-06|     M|  3000|
+------------------+----------+------+------+

0
投票

pyspark-nested-functions库允许可读代码:

from nestedfunctions.functions.terminal_operations import apply_terminal_operation
from pyspark.sql.functions import when
processed = apply_terminal_operation(
      df,
      field = "name.firstname",
      f = lambda x, type: when(x=='Jon', 'John').otherwise(x)
  )
processed.show()
# +---------------+----------+------+------+
# |           name|       dob|gender|gender|
# +---------------+----------+------+------+
# |{John, , Smith}|1580-01-06|     M|  3000|
# +---------------+----------+------+------+
© www.soinside.com 2019 - 2024. All rights reserved.