我有一个“通用”spark 结构化流作业,它监视顶级文件夹(伞)并遍历所有子文件夹(kafka 主题数据),然后将每个 Kafka 主题数据文件夹作为增量写入单独的输出容器中。每个 Kafka 主题数据文件夹都有自己的输出文件夹。此外,完成压缩并创建外部表。目的是让它尽可能通用,这样就不需要为每个 Kakfka 主题数据文件夹编写一个单独的 spark 结构化流作业。
当然是现在的问题。由于历史原因,一些 Kafka 数据主题文件夹的文件为 avro,而对于某些 .,它是 parquet。由于某些原因,我们现在正转向 avro 成为着陆区的一种常见格式。但问题是数据在镶木地板中的主题。需要注意的是:一个 Kafka 数据主题只有一种格式(avro 或 parquet)。但问题是我正在尝试编写这个通用的火花流作业,查看顶级文件夹。
将格式指定为 avro 或 parquet,在执行
readstream
时抛出异常,然后它抛出一个非常通用的异常作为 AnalysisException
,这也可能是由其他原因引起的(不仅仅是推断模式的问题)。现在,我只是在进行全面的异常处理,但它隐藏了真正的错误,这并不好。在执行格式为镶木地板的火花流作业时,如何完全忽略 avro 主题文件夹?我已经尝试过format
和pathGlobFilter
但是当它进入avro文件夹时,它会过滤掉所有avro文件然后得到一个empty
文件夹再次抛出异常。
请注意名称中没有模式可用于区分包含 avro 和 parquet 的子文件夹。下面图解说明
说得够多了,代码片段,描述了问题。检查最后一行,尤其是我在这里寻求帮助的
try catch
。
top_level_folder_path= f"abfss://{sourcecontainer}@{datalakename}.dfs.core.windows.net/toplevelfolder"
for datahub_domain in dbutils.fs.ls(top_level_folder_path):
for datahub_topic in topicsearchpath:
....derive some variables here.....
...............................
# some config
cloudfile = {
"cloudFiles.format": "parquet",
"cloudFiles.includeExistingFiles": "true",
"cloudFiles.inferColumnTypes": "true",
"cloudFiles.schemaLocation": f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/autoloader/schemas/{originaltopicname}/",
"cloudFiles.schemaEvolutionMode": "addNewColumns",
"cloudFiles.allowOverwrites": "true",
"ignoreCorruptFiles": "true",
"ignoreMissingFiles": "true",
}
try:
df = (
spark.readStream.format("cloudFiles")
.options(**cloudfile)
.load(datahub_topic.path)
)
dstreamQuery = (
df.writeStream.format("delta")
.outputMode("append")
.queryName(f"{schema_name}_raw_{table_name}")
.option(
"checkpointLocation",
f"abfss://raw@{datalakename}.dfs.core.windows.net/autoloader/checkpoint/{originaldomainname}/{originaltopicname}/",
)
.option("mergeSchema", "true")
.partitionBy("Year", "Month", "Day")
.trigger(availableNow=True)
.start(
f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/delta/{originaltopicname}"
)
)
while len(spark.streams.active) > 0:
spark.streams.awaitAnyTermination()
except Exception as e: # I DO NOT WANT TO DO THIS
logger.warning(f"Error reading stream: {str(e)}") #### VERY BAD