如何更新spark中嵌套数组内的值

问题描述 投票:0回答:1

我目前正在尝试更新 pyspark 数据框中的列的值。

这是架构:

root
|-- date: string
|-- ticket: struct
|    |-- money: array
|    |    |-- element: struct
|    |    |    |-- currency: string
|    |    |    |-- total: double

如果我们的值为 -9999 到 None,我想将“total”的值更改为 None

这是我所拥有的:

+-----------------------++----------------------------------------+
|date                   ||ticket                                  |
+-----------------------++----------------------------------------+
|2024-02-02T04:31:06    ||[{CAD, -9999}, {CAD, -9999}]            |
|2024-02-02T04:31:06    ||[{CAD, -9999}, {CAD, -9999}]            |
+-----------------------++----------------------------------------+

这是我尝试得到的最终结果:

+-----------------------++----------------------------------------+
|date                   ||ticket                                  |
+-----------------------++----------------------------------------+
|2024-02-02T04:31:06    ||[{CAD, None}, {CAD, None}]              |
|2024-02-02T04:31:06    ||[{CAD, None}, {CAD, None}]              |
+-----------------------++----------------------------------------+

编辑:我不能有新列,我需要保持此架构不变

apache-spark pyspark
1个回答
0
投票

这是使用

withField

的解决方案
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
from pyspark.sql import functions as F

# Create a SparkSession
spark = SparkSession.builder \
    .appName("CreateDataFrame") \
    .getOrCreate()

# Define the schema
schema = StructType([
    StructField("date", StringType(), True),
    StructField("ticket", StructType([
        StructField("money", ArrayType(StructType([
            StructField("currency", StringType(), True),
            StructField("total", DoubleType(), True)
        ])), True)
    ]), True)
])


# Sample data
data = [
    ("2024-04-18", [[{"currency": "USD", "total": -9999.0}, {"currency": "EUR", "total": 80.0}]]),
    ("2024-04-19", [[{"currency": "GBP", "total": -9999.0}, {"currency": "JPY", "total": 1000.0}]])
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Show DataFrame
df.show(truncate=False)
# +----------+---------------------------------+
# |date      |ticket                           |
# +----------+---------------------------------+
# |2024-04-18|{[{USD, -9999.0}, {EUR, 80.0}]}  |
# |2024-04-19|{[{GBP, -9999.0}, {JPY, 1000.0}]}|
# +----------+---------------------------------+

# Show schema
df.printSchema()
# root
#  |-- date: string (nullable = true)
#  |-- ticket: struct (nullable = true)
#  |    |-- money: array (nullable = true)
#  |    |    |-- element: struct (containsNull = true)
#  |    |    |    |-- currency: string (nullable = true)
#  |    |    |    |-- total: double (nullable = true)

以下是示例数据框的外观:

将所有小于-9999的值更改为

None

df.withColumn('ticket',
 F.col('ticket')
    .withField('money', 
       F.transform("ticket.money",
            lambda c: c.withField("total",F.when(c.getField("total")< 0, None)
                                           .otherwise(c.getField("total")))
                  )
               )
              ).show(truncate=False)
    
# +----------+------------------------------+
# |date      |ticket                        |
# +----------+------------------------------+
# |2024-04-18|{[{USD, null}, {EUR, 80.0}]}  |
# |2024-04-19|{[{GBP, null}, {JPY, 1000.0}]}|
# +----------+------------------------------+

还有一个用于此类嵌套字段转换的库:pyspark-nested-functions

© www.soinside.com 2019 - 2024. All rights reserved.