获取 pyspark 损坏记录原因

问题描述 投票:0回答:1

我正在使用 Spark 读取包含一些损坏记录的 json 文件。因此,我使用选项模式 PERMISSIVE 和选项 columnNameOfCorruptRecord 来获取所有损坏的记录。一切工作正常,但是我还需要了解每行记录损坏的原因。 例如:

Json文件:

[{"name":"Jorge", "age": 25, "city": "Manhattan"},
{"name":"Natasha", "age": 34, "city": True},
{"name":"Kelly", "age": 42.5, "city": "New York"}]

架构:

schema = StructType([
  StructField("name", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("city", StringType(), True)
])

对于第 2 行,损坏的记录是“city”列。对于第 3 行,损坏的记录是“age”列。 Spark 将columnNameOfCorruptRecord 中定义的列的所有信息放在一起。有没有什么方法可以获取每行记录损坏的原因?
在我的实际情况中,我有一个包含超过 100 个字段的架构,因此我需要为每个损坏的记录确定该行损坏的原因。

json pyspark
1个回答
0
投票

您可以使用

marshmallow
python 外部库来利用损坏的数据验证。

1.进口
from marshmallow import Schema, fields, ValidationError
from pyspark.sql.functions import col, regexp_replace, udf
from pyspark.sql.types import IntegerType, StructType, StructField, StringType
2.让我们从棉花糖模式定义开始(用于损坏的记录)
class IntConstraint(fields.Field):
    def _deserialize(self, value, attr, data, **kwargs):
        if isinstance(value, int):
            return value
        else:
            raise ValidationError('Field should be int')

class Person(Schema):        
    name = fields.Str(required=True)
    age  = IntConstraint()
    city = fields.Str(required=True)

注意:我们已将自己的

IntConstraint
定义为 Marshmalow,默认接受 float 作为 int。

3.准备 Spark DataFrame
schema = StructType([
  StructField("name", StringType(), True),
  StructField("age", IntegerType(), True),
  StructField("city", StringType(), True),
  StructField("_corrupt_record", StringType(), True)
])

df = spark.read.format("json") \
    .option("mode", "PERMISSIVE") \
    .option("path", "dbfs:/FileStore/datasets/persons-1.json") \
    .schema(schema) \
    .load()

df = df \
    .withColumn("_corrupt_record", regexp_replace(col("_corrupt_record"), "True", "true")) \
    .withColumn("_corrupt_record", regexp_replace(col("_corrupt_record"), "False", "false"))
4.验证损坏的数据
@udf
def validate_user(json_string):
    try:
        if json_string is not None:
            Person().loads(json_string)
        return None
    except ValidationError as error:
        return error.messages


validated_df = df.withColumn("validation_error", validate_user(col("_corrupt_record")))

validated_df.show(truncate = False)

输出:

+-----+----+---------+-------------------------------------------------+----------------------------+
|name |age |city     |_corrupt_record                                  |validation_error            |
+-----+----+---------+-------------------------------------------------+----------------------------+
|Jorge|25  |Manhattan|null                                             |null                        |
|null |null|null     |{"name":"Natasha", "age": 34, "city": true}      |{city=[Not a valid string.]}|
|Kelly|null|New York |{"name":"Kelly", "age": 42.5, "city": "New York"}|{age=[Field should be int]} |
+-----+----+---------+-------------------------------------------------+----------------------------+
© www.soinside.com 2019 - 2024. All rights reserved.