如果通过spark读取json时需要进行模式验证,则需要在模式中显式添加“ _corrupt_record”列

问题描述 投票:1回答:1

当我通过spark(使用scala)阅读JSON时

val rdd = spark.sqlContext.read.json("/Users/sanyam/Downloads/data/input.json")
val df = rdd.toDF()
df.show()
println(df.schema)

//val schema = df.schema.add("_corrupt_record",org.apache.spark.sql.types.StringType,true)
//val rdd1 = spark.sqlContext.read.schema(schema).json("/Users/sanyam/Downloads/data/input_1.json")
//rdd1.toDF().show()

这将导致以下DF:

+--------+----------------+----------+----------+----------+--------------------+----+--------------------+-------+---+---------+--------------+--------------------+--------------------+------------+----------+--------------------+
|   appId|    appTimestamp|appVersion|  bankCode|bankLocale|                data|date|         environment|  event| id|  logTime|       logType|                msid|                muid|       owner|recordType|                uuid|
+--------+----------------+----------+----------+----------+--------------------+----+--------------------+-------+---+---------+--------------+--------------------+--------------------+------------+----------+--------------------+
|services| 1 446026400000 |    2.10.4|loadtest81|        en|Properties : {[{"...|user|af593c4b000c29605c90|Payment|  1|152664593|AppActivityLog|90022384526564ffc...|22488dcc8b29-235c...|productOwner|event-logs|781ce0aaaaa82313e8c9|
|services| 1 446026400000 |    2.10.4|loadtest81|        en|Properties : {[{"...|user|af593c4b000c29605c90|Payment|  1|152664593|AppActivityLog|90022384526564ffc...|22488dcc8b29-235c...|productOwner|event-logs|781ce0aaaaa82313e8c9|
+--------+----------------+----------+----------+----------+--------------------+----+--------------------+-------+---+---------+--------------+--------------------+--------------------+------------+----------+--------------------+

StructType(StructField(appId,StringType,true), StructField(appTimestamp,StringType,true), StructField(appVersion,StringType,true), StructField(bankCode,StringType,true), StructField(bankLocale,StringType,true), StructField(data,StringType,true), StructField(date,StringType,true), StructField(environment,StringType,true), StructField(event,StringType,true), StructField(id,LongType,true), StructField(logTime,LongType,true), StructField(logType,StringType,true), StructField(msid,StringType,true), StructField(muid,StringType,true), StructField(owner,StringType,true), StructField(recordType,StringType,true), StructField(uuid,StringType,true))

如果我想对所读的任何其他json应用验证,那么我将模式作为变量,并在.schema中将其解析为参数[请参考代码的注释行],但即使是损坏的记录也不会不要进入_corrupt_record列(默认情况下应该发生),而是将bad records解析为所有列中的null,这会导致数据丢失,因为theie没有记录。尽管在架构中显式添加_corrupt_record列时一切正常,并且destroy_record进入相应的列,但我想知道为什么会这样吗?(此外,如果您提供格式错误的Json,spark会通过创建_corrupt_record列来自动处理它,因此架构验证为什么需要更早地添加显式列)?]

json apache-spark apache-spark-sql rdd
1个回答
0
投票

读取损坏的json数据将返回架构为[_corrupt_record: string]。但是,您正在读取错误的schema数据,因此将整个行作为null。但是,当您显式添加_corrupt_record时,您会在该列中获得整个json记录,而我假设在所有其他列中都为null。

© www.soinside.com 2019 - 2024. All rights reserved.