我有 PySpark DataFrame,其中列映射结果具有字符串格式,并且其中包含两个 json 数组
spark.createDataFrame(pd.DataFrame({'server': {0: '3456gj',
1: '56ujdn98',
2: '56v95bd',
3: 'barca6mw2k'},
'logev': {0: 'map.map',
1: 'map.map',
2: 'map.map',
3: 'map.map'},
'status': {0: 'no prod',
1: 'as rules',
2: 'is found',
3: 'not found'},
'mappingresult': {0: '[]',
1: '[]',
2: '[{"model":"s","com":"48b4-bfde","ctid":987456,"params":[{"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"76r-bfde","ctid":987456,"params":[{"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}]',
3: '[{"model":"s","com":"4827-44e9","ctid":987456,"params":[{"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"2222-44e9","ctid":987456,"params":[{"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}]'}})).show()
服务器 | 洛格夫 | 状态 | 测绘结果 |
---|---|---|---|
3456gj | 地图.地图 | 无产品 | [] |
56ujdn98 | 地图.地图 | 按照规则 | [] |
56v95bd | 地图.地图 | 被发现了 | [{"model":"s","com":"48b4-bfde","ctid":987456,"params":[{"ID":"tr","val":"399.00"}, {"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"76r-bfde","ctid":987456,"params":[ {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}] |
barca6mw2k | 地图.地图 | 未找到 | [{"model":"s","com":"4827-44e9","ctid":987456,"params":[{"ID":"tr","val":"399.00"}, {"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"2222-44e9","ctid":987456,"params":[ {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}] |
我需要根据 json arry 的数量增加行数并将 json 解析为列
这就是想要的结果
服务器 | 洛格夫 | 状态 | 测绘结果 | 型号 | com | ctid | 参数 |
---|---|---|---|---|---|---|---|
3456gj | 地图.地图 | 无产品 | [] | s | 48b4-bfde | 987456 | {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"} |
56ujdn98 | 地图.地图 | 按照规则 | [] | s | 76r-bfde | 987456 | {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"} |
56v95bd | 地图.地图 | 被发现了 | [{"model":"s","com":"48b4-bfde","ctid":987456,"params":[{"ID":"tr","val":"399.00"}, {"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"76r-bfde","ctid":987456,"params":[ {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}] | s | 4827-44e9 | 987456 | {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"} |
barca6mw2k | 地图.地图 | 未找到 | [{"model":"s","com":"4827-44e9","ctid":987456,"params":[{"ID":"tr","val":"399.00"}, {"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"2222-44e9","ctid":987456,"params":[ {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}] | s | 2222-44e9 | 987456 | {"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"} |
我知道如何用字符串来实现,但无法将其应用于DataFrame
json_string = """
[{"model":"s","com":"48b4-bfde","ctid":987456,"params":[{"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]},{"model":"s","com":"76r-bfde","ctid":987456,"params":[{"ID":"tr","val":"399.00"},{"IDp":"merch","val":"stackoverflow"}]}]
"""
spark.read.json(spark.sparkContext.parallelize([json_string])).show(vertical=True)
将
from_json
与 schema
与所需的列一起使用,并使用 inline
来展平值数组。
架构 -
array<struct<model:string, com:string,ctid:bigint, params: array<map<string,string>>>>
df.selectExpr(
"inline(
from_json(
mappingresult,
'array<struct<model:string, com:string,ctid:bigint, params: array<map<string,string>>>>'
)
)
")
.show(10, False)
+-----+---------+------+-----------------------------------------------------------------+
|model|com |ctid |params |
+-----+---------+------+-----------------------------------------------------------------+
|s |4827-44e9|987456|[{ID -> tr, val -> 399.00}, {IDp -> merch, val -> stackoverflow}]|
|s |2222-44e9|987456|[{ID -> tr, val -> 399.00}, {IDp -> merch, val -> stackoverflow}]|
+-----+---------+------+-----------------------------------------------------------------+