我有一个场景,我想将字符串有效负载
JSON
数据完全展平到单独的列中,并将其加载到 pyspark 数据帧中以进行进一步处理。原始数据的结构不固定,特别是有效负载不断变化。
样本数据:
{"id0":"d376","name":"data2","time_stamp":"2024-03-27", "payload":"{\"id\":\"d16\",\"time\":\"2000-01-01\",\"type\":\"XYZ\",\"detail\":{\"idx\":\"abc\",\"name\":\"abc\"},\"owner\":{\"id\":\"123\",\"comunicate\":true},\"order\":{\"id\":\"2ff4973580\"},\"product\":{\"id4\":\"aiobfd8315\",\"name2\":\"fidoufodbf\"},\"transaction\":{\"amount\":90 ,\"category\":\"ugfiodfu\",\"create\":\"2024-03-27\",\"type\":\"idugoiuiofg\",\"gid\":\"d4ad\",\"id\":\"61c1025929e\",\"i_id\":\"3b6085400\",\"m_id\":\"1922c45\",\"prett\":null,\"pro_id\":\"a21b31f8315\",\"status\":\"waiting\",\"data2\":{\"create\":\"2000-01-01\",\"delivered\":yes,\"cla\":[\"irugreogeru\",\"fdijfdiog\"],\"id\":\"5542cefba043\",\"items\":[{\"quantity\":\"1\",\"price\":\"564\"}],\"approve\":null,\"tid\":\"6b1025929e\",\"update\":\"2001-02-02\"},\"t_extra\":{\"b_address\":{\"city\":\"\",\"country\":\"\",\"line1\":\"\",\"line2\":\"\",\"state\":\"\"},\"amount\":0,\"display\":{},\"id\":\"d06f5b502d64\",\"metadata\":{\"id4\":\"m044zi1gi\",\"email\":\"[email protected]\"},\"address\":{\"city\":\"\",\"country\":\"\",\"line1\":\"\",\"line2\":\"\",\"code\":\"\",\"state\":\"\"},\"update\":\"2000-01-01\"},\"token\":\"14849f64fc\",\"id5\":\"fbd2cceee\"},\"user\":{\"id\":\"fbdcceee\",\"number\":\"777777777\"},\"sum\":\"1\"}"}
我的文件中有多行 JSON 数据。目的是删除示例 JSON 的所有外部列,并展平 JSON 的
payload
字符串属性内的所有内部列。
json 是动态的,因此,通过在 StructType 或 ArrayType 等中定义模式的常用方法不是很有帮助并且限制方法(即为每个事件数据定义模式),因为数据是动态的并且所有列需要展平。
我能最接近解决问题的是下面的代码,它仍然继续将有效负载部分包装在键值对的字符串中,而不是将它们提取到单独的列中。
代码:
def flatten_json(y):
out = {}
def flatten(x, name=''):
if type(x) is dict:
for a in x:
flatten(x[a], name + a + '.')
elif type(x) is list:
i = 0
for a in x:
flatten(a, name + str(i) + '.')
i += 1
else:
out[name[:-1]] = x
flatten(y)
return out
spark = SparkSession.builder \
.appName("App") \
.getOrCreate()
df = spark.read.json(path)
df = df.select('payload')
flatten_udf = udf(lambda json_str: flatten_json(json.loads(json_str)), MapType(StringType(), StringType()))
df_flattened = df.withColumn("payload", flatten_udf("payload"))
df_flattened.show(truncate=False)
期望有不同的 df 列,例如:
id -> string column
time -> string column
type -> string column
detail.idx -> string column
detailname -> string column
owner.id -> string column
owner.communicate -> string column
... etc., all other columns
pyspark中有一个漂亮的方法
schema_of_json
,它可以导出json字符串的模式并应用于整个列。
所以处理动态json负载的方法如下:
首先获取数据帧第一行的
json_payload
使用
schema_of_json
创建 json_payload 的架构
那么如果所有行都被正确解析,则不会有
null
值。
如果存在空值,则意味着这些行尚未正确解析。
所以我们将这些行转换回字符串。然后使用 .contains
我们将
检查字符串列中是否存在 null
字符串。
这样我们将有两个数据框。一个数据框将正确地具有 解析值。另一个数据帧将具有错误解析的值。
现在我们将对错误解析的数据帧重复该过程。
这是带有自定义数据的示例脚本。
from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructType, StructField
spark = SparkSession.builder \
.appName("JsonPayloadDataFrame") \
.getOrCreate()
schema = StructType([
StructField("json_payload", StringType(), True)
])
data = [
Row(json_payload='{"id": 1, "name": "Alice"}'),
Row(json_payload='{"id": 2, "tags": ["spark", "python"], "active": true}'),
Row(json_payload='{"id": 3, "details": {"age": 30, "location": "New York"}}'),
Row(json_payload='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}')
]
df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show(n=30, truncate=False)
wanted_df_list = []
df_looped = df
while not df_looped.isEmpty():
print("USING SCHEMA FROM ONE ROW")
current_string_repr = df_looped.limit(1).select(col("json_payload")).rdd.map(lambda x: x["json_payload"]).collect()[0]
print(f"{current_string_repr=}")
df_parsed = df_looped.withColumn("parsed_struct", from_json(col("json_payload"), schema_of_json(current_string_repr)))
df_parsed.show(n=30, truncate=False)
df_parsed.printSchema()
df_null_check = df_parsed.withColumn("null_present_str", col("parsed_struct").cast(StringType()))
df_null_check.show(n=30, truncate=False)
df_null_check.printSchema()
df_null_present = df_null_check.withColumn("null_present_bool", col("null_present_str").contains("null"))
df_null_present.show(n=30, truncate=False)
df_null_present.printSchema()
df_partial_correct = df_null_present.filter(col("null_present_bool") == False)
df_partial_incorrect = df_null_present.filter(col("null_present_bool") == True)
df_partial_correct.cache().show(n=30, truncate=False)
df_partial_correct.printSchema()
df_partial_incorrect.cache().show(n=30, truncate=False)
df_partial_incorrect.printSchema()
wanted_df_list.append(df_partial_correct)
df_looped = df_partial_incorrect
for df_ele in wanted_df_list:
df_ele.show(n=30, truncate=False)
最终输出:
+--------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false |
+--------------------------+-------------+----------------+-----------------+
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false |
+---------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
完整输出:
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 1, "name": "Alice"}'
+-------------------------------------------------------------------------------------------------------------+-------------+
|json_payload |parsed_struct|
+-------------------------------------------------------------------------------------------------------------+-------------+
|{"id": 1, "name": "Alice"} |{1, Alice} |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |
+-------------------------------------------------------------------------------------------------------------+-------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
|json_payload |parsed_struct|null_present_str|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
|{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"} |{1, Alice} |{1, Alice} |false |
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
+--------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false |
+--------------------------+-------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{2, NULL} |{2, null} |true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{3, NULL} |{3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL} |{4, null} |true |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 2, "tags": ["spark", "python"], "active": true}'
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{2, null} |true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{4, null} |true |
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|true |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true} |{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL} |{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL} |{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{NULL, 3, NULL}|{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}|{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 3, "details": {"age": 30, "location": "New York"}}'
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{null, 3, null} |true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4, null} |true |
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|true |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}} |{{30, New York}, 3}|{{30, New York}, 3}|false |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false |
+---------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4} |{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}'
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
|json_payload |parsed_struct |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{null, 4} |true |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|true |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
+------------+-------------+----------------+-----------------+
|json_payload|parsed_struct|null_present_str|null_present_bool|
+------------+-------------+----------------+-----------------+
+------------+-------------+----------------+-----------------+
+--------------------------+-------------+----------------+-----------------+
|json_payload |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice} |{1, Alice} |false |
+--------------------------+-------------+----------------+-----------------+
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false |
+---------------------------------------------------------+-------------------+-------------------+-----------------+
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload |parsed_struct |null_present_str |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+