数据看起来像这样 -
+-----------+-----------+-----------------------------+
| id| point| data|
+-----------------------------------------------------+
| abc| 6|{"key1":"124", "key2": "345"}|
| dfl| 7|{"key1":"777", "key2": "888"}|
| 4bd| 6|{"key1":"111", "key2": "788"}|
我正在尝试将其分解为以下格式。
+-----------+-----------+-----------+-----------+
| id| point| key1| key2|
+------------------------------------------------
| abc| 6| 124| 345|
| dfl| 7| 777| 888|
| 4bd| 6| 111| 788|
explode
函数将数据框分解为多行。但这不是理想的解决方案。
注意:此解决方案不能回答我的问题。 PySpark 在列中“爆炸”字典
pyspark.sql.functions.from_json
应该会得到您想要的结果,但您需要首先定义所需的 schema
from pyspark.sql.functions import from_json, col
from pyspark.sql.types import StructType, StructField, StringType
schema = StructType(
[
StructField('key1', StringType(), True),
StructField('key2', StringType(), True)
]
)
df.withColumn("data", from_json("data", schema))\
.select(col('id'), col('point'), col('data.*'))\
.show()
这应该给你
+---+-----+----+----+
| id|point|key1|key2|
+---+-----+----+----+
|abc| 6| 124| 345|
|df1| 7| 777| 888|
|4bd| 6| 111| 788|
+---+-----+----+----+
正如@pault 所建议的,数据字段是一个
string
字段。由于行上的 JSON 字符串中的键是相同的(即“key1”、“key2”),您也可以使用 json_tuple()
(根据文档,此函数是版本 1.6 中的新增功能)
from pyspark.sql import functions as F
df.select('id', 'point', F.json_tuple('data', 'key1', 'key2').alias('key1', 'key2')).show()
下面是我的原始帖子:如果原始表来自df.show(truncate=False)
并且因此
data
字段不是Python数据结构,那么这很可能是错误。
由于您已将数据分解为行,所以我认为列
data
是Python数据结构而不是字符串:
from pyspark.sql import functions as F
df.select('id', 'point', F.col('data').getItem('key1').alias('key1'), F.col('data')['key2'].alias('key2')).show()
正如 @jxc 所提到的,如果您无法事先定义模式并且只需要处理单个级别的 json 字符串,则
json_tuple
应该可以正常工作。我认为它更直接且更易于使用。奇怪的是,我之前没有发现其他人提到过这个功能。
在我的用例中,原始数据框架构:
StructType(List(StructField(a,StringType,true)))
,json 字符串列显示为:
+---------------------------------------+
|a |
+---------------------------------------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|
|{"k1": "v11", "k3": "v33"} |
|{"k1": "v13", "k2": "23"} |
+---------------------------------------+
使用
json_tuple
将 json 字段扩展为新列:
from pyspark.sql import functions as F
df = df.select(F.col('a'),
F.json_tuple(F.col('a'), 'k1', 'k2', 'k3') \
.alias('k1', 'k2', 'k3'))
df.schema
df.show(truncate=False)
文档并没有对此说太多,但至少在我的用例中,
json_tuple
提取的新列是StringType
,并且它只提取单个深度的JSON字符串。
StructType(List(StructField(k1,StringType,true),StructField(k2,StringType,true),StructField(k3,StringType,true)))
+---------------------------------------+---+----+-------+
|a |k1 |k2 |k3 |
+---------------------------------------+---+----+-------+
|{"k1": "v1", "k2": "2", "k3": {"m": 1}}|v1 |2 |{"m":1}|
|{"k1": "v11", "k3": "v33"} |v11|null|v33 |
|{"k1": "v13", "k2": "23"} |v13|23 |null |
+---------------------------------------+---+----+-------+
在这种方法中,您只需要设置包含 Json 内容的列的名称。 无需设置架构。它会自动完成一切。
json_col_name = 'data'
keys = df.select(f"{json_col_name}.*").columns
jsonFields= [f"{json_col_name}.{key} {key}" for key in keys]
main_fields = [key for key in df.columns if key != json_col_name]
df_new = df.selectExpr(main_fields + jsonFields)
这适用于我的用例
data1 = spark.read.parquet(path)
json_schema = spark.read.json(data1.rdd.map(lambda row: row.json_col)).schema
data2 = data1.withColumn("data", from_json("json_col", json_schema))
col1 = data2.columns
col1.remove("data")
col2 = data2.select("data.*").columns
append_str ="data."
col3 = [append_str + val for val in col2]
col_list = col1 + col3
data3 = data2.select(*col_list).drop("json_col")
所有功劳归于Shrikant Prabhu
您可以简单地使用SQL
SELECT id, point, data.*
FROM original_table
像这样,如果数据发生变化,新表的架构将进行调整,并且您无需在管道中执行任何操作。