PySpark 数据帧从_json 转换

问题描述 投票:0回答:1

我有下面的 JSON,我正在从 Kafka 读取它,然后尝试使用 from_json 函数转换为 StructType。

schema_session_start = StructType([
    StructField("ID", StringType()),
    StructField("SID", StringType()),
    StructField("EP", LongType()),
    StructField("IP", StringType()),
    StructField("LN", StringType()),
    StructField("VN", StringType()),
    StructField("DV", StructType([
        StructField("MK", StringType()),
        StructField("MDL", StringType()),
        StructField("OS", StringType()),
        StructField("OSVN", StringType()),
        StructField("AR", StringType())
    ])),
    StructField("MC", StringType()),
    StructField("FN", StringType()),
    StructField("NW", StructType([
        StructField("TP", StringType())
    ])),
    StructField("AL", StringType()),
    StructField("EN", StringType())
])
价值 CN
{“ID”:“651551912131b2.07017577”,“SID”:“169156360280217644”,“EP”:1695895952305,“IP”:“10.10.10.10”,“LN”:“”,“VN”:“2.4. 0.0","DV":{"MK":"Jio","MDL":"JHSD200","OS":"JioOS 2","OSVN":"9","AR":"armeabi-v7a" },"MC":"02:00:00:00:00:00","FN":true,"NW":"TP":"wifi_5"},"AL":"GRIPdemo","EN" :"Session_Start"} 会话_开始
array_df = condition_df.withColumn("value_json",from_json(col("value"),when(condition_df.EN == "Session_Start", schema_session_start)))

当我尝试转换时出现以下错误:

错误:根:发生错误:“StructField”对象没有属性“_get_object_id”

apache-spark pyspark apache-kafka apache-spark-sql spark-structured-streaming
1个回答
0
投票

from_json
的第二个参数应该是带有模式的字符串或
StructType
(请参阅docs),但在您的情况下它是
Column
。如果您只想将特定模式应用于给定的事件类型,那么您需要采取不同的做法 - 将
when
移到
from_json
之外,如下所示:

array_df = condition_df.withColumn("value_json",
  when(condition_df.EN == "Session_Start",
     from_json(col("value"), schema_session_start)))
© www.soinside.com 2019 - 2024. All rights reserved.