如何修改pyspark dataframe嵌套结构列

问题描述 投票:0回答:2

我正在尝试对嵌套列进行匿名/哈希处理,但尚未成功。架构看起来像这样:

-- abc: struct (nullable = true)
|    |-- xyz: struct (nullable = true)
|    |    |-- abc123: string (nullable = true)
|    |    |-- services: struct (nullable = true)
|    |    |    |-- service: array (nullable = true)
|    |    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |    |-- type: string (nullable = true)
|    |    |    |    |    |-- subtype: string (nullable = true)

我需要更改(匿名/哈希)

type
列的值。

dataframe apache-spark pyspark struct apache-spark-sql
2个回答
6
投票

对于 Spark 3.1+,有一个列方法

withField
可用于更新结构体字段。

假设这是您的输入数据框(对应于您提供的架构):

from pyspark.sql import Row

df = spark.createDataFrame([
    Row(abc=Row(xyz=Row(abc123="value123", services=[Row(type="type1", subtype="subtype1")])))
])

df.show(truncate=False)
#+---------------------------------+
#|abc                              |
#+---------------------------------+
#|{{value123, [{type1, subtype1}]}}|
#+---------------------------------+

您可以在数组 transform

 上使用 
services
 来哈希每个结构体元素的字段 
type
(这里我使用
xxhash64
函数来说明),如下所示:

import pyspark.sql.functions as F

df2 = df.withColumn(
    "abc",
    F.col("abc").withField(
        "xyz",
        F.col("abc.xyz").withField(
            "services",
            F.expr("transform(abc.xyz.services, x -> struct(xxhash64(x.type) as type, x.subtype))")
        )
    )
)

df2.show(truncate=False)
#+-----------------------------------------------+
#|abc                                            |
#+-----------------------------------------------+
#|{{value123, [{2134479862461603894, subtype1}]}}|
#+-----------------------------------------------+

对于较旧的 Spark 版本,您需要重新创建整个结构才能更新字段,这在存在许多嵌套字段时变得乏味。在你的情况下,它会是这样的:

df2 = df.withColumn(
    "abc",
    F.struct(
        F.struct(
            F.col("abc.xyz.abc123"),
            F.expr(
                "transform(abc.xyz.services, x -> struct(xxhash64(x.type) as type, x.subtype))"
            ).alias("services")
        ).alias("xyz")
    )
)

0
投票

使用

pyspark-nested-functions
库中的 hash 函数,您可以使用
"abc.xyz.services.type"
:
 散列任何嵌套字段(例如 
hash_field(df, "abc.xyz.services.type")

from pyspark.sql import Row
df = spark.createDataFrame([
    Row(abc=Row(xyz=Row(abc123="value123", services=[Row(type="type1", subtype="subtype1")])))
])

df.show(truncate=False)
# +---------------------------------+                                             
# |abc                              |
# +---------------------------------+
# |{{value123, [{type1, subtype1}]}}|
# +---------------------------------+

from nestedfunctions.functions.hash import hash_field

hashed_df = hash_field(df, "abc.xyz.services.type", num_bits=256)
hashed_df.show(truncate=False)
# +--------------------------------------------------------------------------------------------+
# |abc                                                                                         |
# +--------------------------------------------------------------------------------------------+
# |{{value123, [{ba5857c2e8a7c12df14097eaa5ffb1c97976b9d433fe63a65df84849c5eea0ec, subtype1}]}}|
# +--------------------------------------------------------------------------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.