使用 pyspark 展平动态 json 有效负载字符串

问题描述 投票:0回答:1

我有一个场景,我想将字符串有效负载

JSON
数据完全展平到单独的列中,并将其加载到 pyspark 数据帧中以进行进一步处理。原始数据的结构不固定,特别是有效负载不断变化。

样本数据:

{"id0":"d376","name":"data2","time_stamp":"2024-03-27", "payload":"{\"id\":\"d16\",\"time\":\"2000-01-01\",\"type\":\"XYZ\",\"detail\":{\"idx\":\"abc\",\"name\":\"abc\"},\"owner\":{\"id\":\"123\",\"comunicate\":true},\"order\":{\"id\":\"2ff4973580\"},\"product\":{\"id4\":\"aiobfd8315\",\"name2\":\"fidoufodbf\"},\"transaction\":{\"amount\":90 ,\"category\":\"ugfiodfu\",\"create\":\"2024-03-27\",\"type\":\"idugoiuiofg\",\"gid\":\"d4ad\",\"id\":\"61c1025929e\",\"i_id\":\"3b6085400\",\"m_id\":\"1922c45\",\"prett\":null,\"pro_id\":\"a21b31f8315\",\"status\":\"waiting\",\"data2\":{\"create\":\"2000-01-01\",\"delivered\":yes,\"cla\":[\"irugreogeru\",\"fdijfdiog\"],\"id\":\"5542cefba043\",\"items\":[{\"quantity\":\"1\",\"price\":\"564\"}],\"approve\":null,\"tid\":\"6b1025929e\",\"update\":\"2001-02-02\"},\"t_extra\":{\"b_address\":{\"city\":\"\",\"country\":\"\",\"line1\":\"\",\"line2\":\"\",\"state\":\"\"},\"amount\":0,\"display\":{},\"id\":\"d06f5b502d64\",\"metadata\":{\"id4\":\"m044zi1gi\",\"email\":\"[email protected]\"},\"address\":{\"city\":\"\",\"country\":\"\",\"line1\":\"\",\"line2\":\"\",\"code\":\"\",\"state\":\"\"},\"update\":\"2000-01-01\"},\"token\":\"14849f64fc\",\"id5\":\"fbd2cceee\"},\"user\":{\"id\":\"fbdcceee\",\"number\":\"777777777\"},\"sum\":\"1\"}"}

我的文件中有多行 JSON 数据。目的是删除示例 JSON 的所有外部列,并展平 JSON 的

payload
字符串属性内的所有内部列。

json 是动态的,因此,通过在 StructType 或 ArrayType 等中定义模式的常用方法不是很有帮助并且限制方法(即为每个事件数据定义模式),因为数据是动态的并且所有列需要展平。

我能最接近解决问题的是下面的代码,它仍然继续将有效负载部分包装在键值对的字符串中,而不是将它们提取到单独的列中。

代码:

def flatten_json(y):
    out = {}
    def flatten(x, name=''):
        if type(x) is dict:
            for a in x:
                flatten(x[a], name + a + '.')
        elif type(x) is list:
            i = 0
            for a in x:
                flatten(a, name + str(i) + '.')
                i += 1
        else:
            out[name[:-1]] = x
    flatten(y)
    return out

spark = SparkSession.builder \
    .appName("App") \
    .getOrCreate()

df = spark.read.json(path)
df = df.select('payload')

flatten_udf = udf(lambda json_str: flatten_json(json.loads(json_str)), MapType(StringType(), StringType()))
df_flattened = df.withColumn("payload", flatten_udf("payload"))
df_flattened.show(truncate=False)

期望有不同的 df 列,例如:

id -> string column
time -> string column
type -> string column
detail.idx -> string column
detailname -> string column
owner.id -> string column
owner.communicate -> string column
... etc., all other columns
apache-spark pyspark
1个回答
0
投票

pyspark中有一个漂亮的方法

schema_of_json
,它可以导出json字符串的模式并应用于整个列。

所以处理动态json负载的方法如下:

  • 首先获取数据帧第一行的

    json_payload

  • 使用

    schema_of_json

    创建 json_payload 的架构
  • 那么如果所有行都被正确解析,则不会有

    null
    值。 如果存在空值,则意味着这些行尚未正确解析。 所以我们将这些行转换回字符串。然后使用
    .contains
    我们将 检查字符串列中是否存在
    null
    字符串。

  • 这样我们将有两个数据框。一个数据框将正确地具有 解析值。另一个数据帧将具有错误解析的值。

  • 现在我们将对错误解析的数据帧重复该过程。

这是带有自定义数据的示例脚本。

from pyspark.sql.functions import *
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StringType, StructType, StructField


spark = SparkSession.builder \
    .appName("JsonPayloadDataFrame") \
    .getOrCreate()

schema = StructType([
    StructField("json_payload", StringType(), True)
])

data = [
    Row(json_payload='{"id": 1, "name": "Alice"}'),
    Row(json_payload='{"id": 2, "tags": ["spark", "python"], "active": true}'),
    Row(json_payload='{"id": 3, "details": {"age": 30, "location": "New York"}}'),
    Row(json_payload='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}')
]

df = spark.createDataFrame(data, schema=schema)
df.printSchema()
df.show(n=30, truncate=False)


wanted_df_list = []
df_looped = df

while not df_looped.isEmpty():
    print("USING SCHEMA FROM ONE ROW")
    current_string_repr = df_looped.limit(1).select(col("json_payload")).rdd.map(lambda x: x["json_payload"]).collect()[0]
    print(f"{current_string_repr=}")

    df_parsed = df_looped.withColumn("parsed_struct", from_json(col("json_payload"), schema_of_json(current_string_repr)))
    df_parsed.show(n=30, truncate=False)
    df_parsed.printSchema()

    df_null_check = df_parsed.withColumn("null_present_str", col("parsed_struct").cast(StringType()))
    df_null_check.show(n=30, truncate=False)
    df_null_check.printSchema()

    df_null_present = df_null_check.withColumn("null_present_bool", col("null_present_str").contains("null"))
    df_null_present.show(n=30, truncate=False)
    df_null_present.printSchema()

    df_partial_correct = df_null_present.filter(col("null_present_bool") == False)
    df_partial_incorrect = df_null_present.filter(col("null_present_bool") == True)
    df_partial_correct.cache().show(n=30, truncate=False)
    df_partial_correct.printSchema()
    df_partial_incorrect.cache().show(n=30, truncate=False)
    df_partial_incorrect.printSchema()

    wanted_df_list.append(df_partial_correct)

    df_looped = df_partial_incorrect


for df_ele in wanted_df_list:
    df_ele.show(n=30, truncate=False)

最终输出:

+--------------------------+-------------+----------------+-----------------+
|json_payload              |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice}   |{1, Alice}      |false            |
+--------------------------+-------------+----------------+-----------------+

+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload                                          |parsed_struct             |null_present_str          |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false            |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+

+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload                                             |parsed_struct      |null_present_str   |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false            |
+---------------------------------------------------------+-------------------+-------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct                                   |null_present_str                                |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false            |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+

完整输出:

USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 1, "name": "Alice"}'
+-------------------------------------------------------------------------------------------------------------+-------------+
|json_payload                                                                                                 |parsed_struct|
+-------------------------------------------------------------------------------------------------------------+-------------+
|{"id": 1, "name": "Alice"}                                                                                   |{1, Alice}   |
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{2, NULL}    |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{3, NULL}    |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL}    |
+-------------------------------------------------------------------------------------------------------------+-------------+

+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
|json_payload                                                                                                 |parsed_struct|null_present_str|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+
|{"id": 1, "name": "Alice"}                                                                                   |{1, Alice}   |{1, Alice}      |
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{2, NULL}    |{2, null}       |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{3, NULL}    |{3, null}       |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL}    |{4, null}       |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+

+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}                                                                                   |{1, Alice}   |{1, Alice}      |false            |
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{2, NULL}    |{2, null}       |true             |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{3, NULL}    |{3, null}       |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL}    |{4, null}       |true             |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+

+--------------------------+-------------+----------------+-----------------+
|json_payload              |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice}   |{1, Alice}      |false            |
+--------------------------+-------------+----------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{2, NULL}    |{2, null}       |true             |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{3, NULL}    |{3, null}       |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{4, NULL}    |{4, null}       |true             |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+

USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 2, "tags": ["spark", "python"], "active": true}'
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct             |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{true, 2, [spark, python]}|{2, null}       |true             |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{NULL, 3, NULL}           |{3, null}       |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}           |{4, null}       |true             |
+-------------------------------------------------------------------------------------------------------------+--------------------------+----------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct             |null_present_str          |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{true, 2, [spark, python]}|{true, 2, [spark, python]}|true             |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{NULL, 3, NULL}           |{null, 3, null}           |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}           |{null, 4, null}           |true             |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct             |null_present_str          |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}                                                       |{true, 2, [spark, python]}|{true, 2, [spark, python]}|false            |
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{NULL, 3, NULL}           |{null, 3, null}           |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}           |{null, 4, null}           |true             |
+-------------------------------------------------------------------------------------------------------------+--------------------------+--------------------------+-----------------+

+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload                                          |parsed_struct             |null_present_str          |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false            |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct  |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{NULL, 3, NULL}|{null, 3, null} |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4, NULL}|{null, 4, null} |true             |
+-------------------------------------------------------------------------------------------------------------+---------------+----------------+-----------------+

USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 3, "details": {"age": 30, "location": "New York"}}'
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct      |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{{30, New York}, 3}|{null, 3, null} |true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4}          |{null, 4, null} |true             |
+-------------------------------------------------------------------------------------------------------------+-------------------+----------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload                                                                                                 |parsed_struct      |null_present_str   |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{{30, New York}, 3}|{{30, New York}, 3}|true             |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4}          |{null, 4}          |true             |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload                                                                                                 |parsed_struct      |null_present_str   |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}                                                    |{{30, New York}, 3}|{{30, New York}, 3}|false            |
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4}          |{null, 4}          |true             |
+-------------------------------------------------------------------------------------------------------------+-------------------+-------------------+-----------------+

+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload                                             |parsed_struct      |null_present_str   |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false            |
+---------------------------------------------------------+-------------------+-------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct|null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{NULL, 4}    |{null, 4}       |true             |
+-------------------------------------------------------------------------------------------------------------+-------------+----------------+-----------------+

USING SCHEMA FROM ONE ROW
current_string_repr='{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}'
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
|json_payload                                                                                                 |parsed_struct                                   |null_present_str|null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{null, 4}       |true             |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+----------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct                                   |null_present_str                                |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|true             |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct                                   |null_present_str                                |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false            |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct                                   |null_present_str                                |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false            |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+

+------------+-------------+----------------+-----------------+
|json_payload|parsed_struct|null_present_str|null_present_bool|
+------------+-------------+----------------+-----------------+
+------------+-------------+----------------+-----------------+

+--------------------------+-------------+----------------+-----------------+
|json_payload              |parsed_struct|null_present_str|null_present_bool|
+--------------------------+-------------+----------------+-----------------+
|{"id": 1, "name": "Alice"}|{1, Alice}   |{1, Alice}      |false            |
+--------------------------+-------------+----------------+-----------------+

+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|json_payload                                          |parsed_struct             |null_present_str          |null_present_bool|
+------------------------------------------------------+--------------------------+--------------------------+-----------------+
|{"id": 2, "tags": ["spark", "python"], "active": true}|{true, 2, [spark, python]}|{true, 2, [spark, python]}|false            |
+------------------------------------------------------+--------------------------+--------------------------+-----------------+

+---------------------------------------------------------+-------------------+-------------------+-----------------+
|json_payload                                             |parsed_struct      |null_present_str   |null_present_bool|
+---------------------------------------------------------+-------------------+-------------------+-----------------+
|{"id": 3, "details": {"age": 30, "location": "New York"}}|{{30, New York}, 3}|{{30, New York}, 3}|false            |
+---------------------------------------------------------+-------------------+-------------------+-----------------+

+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|json_payload                                                                                                 |parsed_struct                                   |null_present_str                                |null_present_bool|
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
|{"id": 4, "history": [{"date": "2023-01-01", "action": "login"}, {"date": "2023-01-02", "action": "logout"}]}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|{[{login, 2023-01-01}, {logout, 2023-01-02}], 4}|false            |
+-------------------------------------------------------------------------------------------------------------+------------------------------------------------+------------------------------------------------+-----------------+
© www.soinside.com 2019 - 2024. All rights reserved.