PySpark withColumn() 函数无法识别层次结构

问题描述 投票:0回答:1

我有一个任务将结构类型的嵌套结构转换为内部有结构的数组类型。为此,我遵循使用 withColumn() 函数的方法。官方文档说该函数创建新列或替换同名的现有列。

我有这条虚拟数据:

dummy_data = [{
    "return": {
        "a": {
            "b": [
                {
                    "id": 1111,
                    "ts1": "1990-03-11T00:00:00+01:00",
                    "ts2": "1990-03-28T00:00:00+02:00",
                    "c": "C",
                    "d": 112,
                    "name": "Name",
                    "obj": {
                        "obj2": {
                            "id": 111,
                            "col1": True,
                            "col2": 1,
                            "col3": 2,
                            "col4": 4097
                        }
                    }
                }
            ]
        }
    }
}]

初始架构:

dummy_schema = StructType([
    StructField("return", StructType([
        StructField("a", StructType([
            StructField("b", ArrayType(StructType([
                StructField("id", LongType(), nullable=True),
                StructField("ts1", StringType(), nullable=True),
                StructField("ts2", StringType(), nullable=True),
                StructField("conf", StringType(), nullable=True),
                StructField("d", LongType(), nullable=True),
                StructField("name", StringType(), nullable=True),
                StructField("obj", StructType([
                    StructField("obj2", StructType(StructType([
                        StructField("id", LongType(), nullable=True),
                        StructField("col1", BooleanType(), nullable=True),
                        StructField("col2", LongType(), nullable=True),
                        StructField("col3", LongType(), nullable=True),
                        StructField("col4", LongType(), nullable=True)
                    ])))
                ]))
            ])))
        ]))
    ]))
])

printSchema()函数的结果:

root
 |-- return: struct (nullable = true)
 |    |-- a: struct (nullable = true)
 |    |    |-- b: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- ts1: string (nullable = true)
 |    |    |    |    |-- ts2: string (nullable = true)
 |    |    |    |    |-- conf: string (nullable = true)
 |    |    |    |    |-- d: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- obj: struct (nullable = true)
 |    |    |    |    |    |-- obj2: struct (nullable = true)
 |    |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |    |-- col1: boolean (nullable = true)
 |    |    |    |    |    |    |-- col2: long (nullable = true)
 |    |    |    |    |    |    |-- col3: long (nullable = true)
 |    |    |    |    |    |    |-- col4: long (nullable = true)

我尝试使用 withColumn() 函数:

df = dummy_df.withColumn('return.a.b.obj.obj2', array(col('return.a.b.obj.obj2')))

但是,它不会将第一个参数识别为对结构的分层访问以进行进一步转换。

执行此功能后我得到以下结果:

enter image description here

我希望 obj2 将是具有以下架构的数组类型:

"array<struct<id:long,col1:boolean,col2:long,col3:long,col4:long>>"

保留层次结构。

json apache-spark pyspark databricks
1个回答
0
投票

可能有一个更干净的解决方案,但您可以定义一个函数来将

obj2
从结构更改为数组 - 但我们从数组列
b
调用此函数(我这样做的原因是因为我无法弄清楚否则如何访问数组字段内的字段)。

def modify_conf(array_column):
    return F.expr("""
        transform({array_column}, 
        x -> struct(
            x.id, 
            x.ts1, 
            x.ts2, 
            x.conf, 
            x.d, 
            x.name,
            struct(
                array(
                    struct(
                        x.obj.obj2.id as id,
                        x.obj.obj2.col1 as col1,
                        x.obj.obj2.col2 as col2,
                        x.obj.obj2.col3 as col3,
                        x.obj.obj2.col4 as col4
                    )
                ) as obj2
            ) as obj
        ))""".format(array_column=array_column))

然后我们可以使用

.withColumn
选择直到 b 的所有嵌套字段,结合
modify_conf
修改剩余字段:

newDF = sparkDF.withColumn(
    "return",
    F.col("return").withField(
        "a",
        F.col("return.a").withField(
            "b",
            modify_conf("return.a.b")
        )
    )
)

然后

newDF
具有以下模式:

root
 |-- return: struct (nullable = true)
 |    |-- a: struct (nullable = true)
 |    |    |-- b: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |-- ts1: string (nullable = true)
 |    |    |    |    |-- ts2: string (nullable = true)
 |    |    |    |    |-- conf: string (nullable = true)
 |    |    |    |    |-- d: long (nullable = true)
 |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |    |-- obj: struct (nullable = false)
 |    |    |    |    |    |-- obj2: array (nullable = false)
 |    |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |    |    |-- col1: boolean (nullable = true)
 |    |    |    |    |    |    |    |-- col2: long (nullable = true)
 |    |    |    |    |    |    |    |-- col3: long (nullable = true)
 |    |    |    |    |    |    |    |-- col4: long (nullable = true)
© www.soinside.com 2019 - 2024. All rights reserved.