我目前正在尝试更新 pyspark 数据框中的列的值。
这是架构:
root
|-- date: string
|-- ticket: struct
| |-- money: array
| | |-- element: struct
| | | |-- currency: string
| | | |-- total: double
如果我们的值为 -9999 到 None,我想将“total”的值更改为 None
这是我所拥有的:
+-----------------------++----------------------------------------+
|date ||ticket |
+-----------------------++----------------------------------------+
|2024-02-02T04:31:06 ||[{CAD, -9999}, {CAD, -9999}] |
|2024-02-02T04:31:06 ||[{CAD, -9999}, {CAD, -9999}] |
+-----------------------++----------------------------------------+
这是我尝试得到的最终结果:
+-----------------------++----------------------------------------+
|date ||ticket |
+-----------------------++----------------------------------------+
|2024-02-02T04:31:06 ||[{CAD, None}, {CAD, None}] |
|2024-02-02T04:31:06 ||[{CAD, None}, {CAD, None}] |
+-----------------------++----------------------------------------+
编辑:我不能有新列,我需要保持此架构不变
这是使用
withField
的解决方案
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType
from pyspark.sql import functions as F
# Create a SparkSession
spark = SparkSession.builder \
.appName("CreateDataFrame") \
.getOrCreate()
# Define the schema
schema = StructType([
StructField("date", StringType(), True),
StructField("ticket", StructType([
StructField("money", ArrayType(StructType([
StructField("currency", StringType(), True),
StructField("total", DoubleType(), True)
])), True)
]), True)
])
# Sample data
data = [
("2024-04-18", [[{"currency": "USD", "total": -9999.0}, {"currency": "EUR", "total": 80.0}]]),
("2024-04-19", [[{"currency": "GBP", "total": -9999.0}, {"currency": "JPY", "total": 1000.0}]])
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Show DataFrame
df.show(truncate=False)
# +----------+---------------------------------+
# |date |ticket |
# +----------+---------------------------------+
# |2024-04-18|{[{USD, -9999.0}, {EUR, 80.0}]} |
# |2024-04-19|{[{GBP, -9999.0}, {JPY, 1000.0}]}|
# +----------+---------------------------------+
# Show schema
df.printSchema()
# root
# |-- date: string (nullable = true)
# |-- ticket: struct (nullable = true)
# | |-- money: array (nullable = true)
# | | |-- element: struct (containsNull = true)
# | | | |-- currency: string (nullable = true)
# | | | |-- total: double (nullable = true)
以下是示例数据框的外观:
将所有小于-9999的值更改为
None
df.withColumn('ticket',
F.col('ticket')
.withField('money',
F.transform("ticket.money",
lambda c: c.withField("total",F.when(c.getField("total")< 0, None)
.otherwise(c.getField("total")))
)
)
).show(truncate=False)
# +----------+------------------------------+
# |date |ticket |
# +----------+------------------------------+
# |2024-04-18|{[{USD, null}, {EUR, 80.0}]} |
# |2024-04-19|{[{GBP, null}, {JPY, 1000.0}]}|
# +----------+------------------------------+
还有一个用于此类嵌套字段转换的库:pyspark-nested-functions。