我正在使用 Spark 读取包含一些损坏记录的 json 文件。因此,我使用选项模式 PERMISSIVE 和选项 columnNameOfCorruptRecord 来获取所有损坏的记录。一切工作正常,但是我还需要了解每行记录损坏的原因。
例如:
Json文件:
[{"name":"Jorge", "age": 25, "city": "Manhattan"},
{"name":"Natasha", "age": 34, "city": True},
{"name":"Kelly", "age": 42.5, "city": "New York"}]
架构:
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True)
])
对于第 2 行,损坏的记录是“city”列。对于第 3 行,损坏的记录是“age”列。 Spark 将columnNameOfCorruptRecord 中定义的列的所有信息放在一起。有没有什么方法可以获取每行记录损坏的原因?
在我的实际情况中,我有一个包含超过 100 个字段的架构,因此我需要为每个损坏的记录确定该行损坏的原因。
您可以使用
marshmallow
python 外部库来利用损坏的数据验证。
from marshmallow import Schema, fields, ValidationError
from pyspark.sql.functions import col, regexp_replace, udf
from pyspark.sql.types import IntegerType, StructType, StructField, StringType
class IntConstraint(fields.Field):
def _deserialize(self, value, attr, data, **kwargs):
if isinstance(value, int):
return value
else:
raise ValidationError('Field should be int')
class Person(Schema):
name = fields.Str(required=True)
age = IntConstraint()
city = fields.Str(required=True)
注意:我们已将自己的
IntConstraint
定义为 Marshmalow,默认接受 float 作为 int。
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("city", StringType(), True),
StructField("_corrupt_record", StringType(), True)
])
df = spark.read.format("json") \
.option("mode", "PERMISSIVE") \
.option("path", "dbfs:/FileStore/datasets/persons-1.json") \
.schema(schema) \
.load()
df = df \
.withColumn("_corrupt_record", regexp_replace(col("_corrupt_record"), "True", "true")) \
.withColumn("_corrupt_record", regexp_replace(col("_corrupt_record"), "False", "false"))
@udf
def validate_user(json_string):
try:
if json_string is not None:
Person().loads(json_string)
return None
except ValidationError as error:
return error.messages
validated_df = df.withColumn("validation_error", validate_user(col("_corrupt_record")))
validated_df.show(truncate = False)
+-----+----+---------+-------------------------------------------------+----------------------------+
|name |age |city |_corrupt_record |validation_error |
+-----+----+---------+-------------------------------------------------+----------------------------+
|Jorge|25 |Manhattan|null |null |
|null |null|null |{"name":"Natasha", "age": 34, "city": true} |{city=[Not a valid string.]}|
|Kelly|null|New York |{"name":"Kelly", "age": 42.5, "city": "New York"}|{age=[Field should be int]} |
+-----+----+---------+-------------------------------------------------+----------------------------+