如何在(Py)Spark结构化流中捕获不正确的(损坏的)JSON记录?

问题描述 投票:1回答:1

我有一个Azure Eventhub,它正在传输数据(JSON格式)。我将其读取为Spark数据帧,使用from_json(col("body"), schema)解析传入的“ body”,其中schema是预定义的。在代码中,它看起来像:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import *

schema = StructType().add(...) # define the incoming JSON schema 

df_stream_input = (spark
.readStream
.format("eventhubs")
.options(**ehConfInput)
.load()
.select(from_json(col("body").cast("string"), schema)
)

现在=如果传入的JSON模式与定义的模式之间存在某些不一致的情况](例如,源eventhub开始以新格式发送数据而不会发出通知),[from_json()函数不会引发错误=而是将NULL放入字段,这些字段在我的schema定义中存在,但在eventhub发送的JSON中不存在我想捕获此信息并将其记录在某处(Spark的log4j,Azure Monitor,警告电子邮件等)。

我的问题是:如何做到这一点的最佳方法是什么。

我的一些想法:

  1. 我首先想到的是拥有一个UDF,它检查NULLs,如果有任何问题,它将引发Exception。我相信不可能通过PySpark将日志发送到log4j,因为“ spark”上下文无法在UDF中(在工作人员上)启动,并且一个人想使用默认值:

    log4jLogger = sc._jvm.org.apache.log4jlogger = log4jLogger.LogManager.getLogger('PySpark Logger')

  2. 我能想到的第二件事是使用“ foreach / foreachBatch”函数并将此检查逻辑放在这里。
  3. 但是我觉得这两种方法都非常相似。...我希望Spark内置一些用于这些目的的东西。

我有一个Azure Eventhub,它正在流式传输数据(JSON格式)。我将其读取为Spark数据帧,并使用其中预定义了模式的from_json(col(“ body”),schema)解析传入的“ body”。在代码中...

json pyspark pyspark-sql spark-structured-streaming azure-eventhub
1个回答
1
投票
事实证明,我误以为columnNameOfCorruptRecord选项可能是一个答案。它不起作用。
© www.soinside.com 2019 - 2024. All rights reserved.