Spark 流处理不同文件格式的文件夹

问题描述 投票:0回答:0

我有一个“通用”spark 结构化流作业,它监视顶级文件夹(伞)并遍历所有子文件夹(kafka 主题数据),然后将每个 Kafka 主题数据文件夹作为增量写入单独的输出容器中。每个 Kafka 主题数据文件夹都有自己的输出文件夹。此外,完成压缩并创建外部表。目的是让它尽可能通用,这样就不需要为每个 Kakfka 主题数据文件夹编写一个单独的 spark 结构化流作业。

当然是现在的问题。由于历史原因,一些 Kafka 数据主题文件夹的文件为 avro,而对于某些 .,它是 parquet。由于某些原因,我们现在正转向 avro 成为着陆区的一种常见格式。但问题是数据在镶木地板中的主题。需要注意的是:一个 Kafka 数据主题只有一种格式(avro 或 parquet)。但问题是我正在尝试编写这个通用的火花流作业,查看顶级文件夹。

将格式指定为 avro 或 parquet,在执行

readstream
时抛出异常,然后它抛出一个非常通用的异常作为
AnalysisException
,这也可能是由其他原因引起的(不仅仅是推断模式的问题)。现在,我只是在进行全面的异常处理,但它隐藏了真正的错误,这并不好。在执行格式为镶木地板的火花流作业时,如何完全忽略 avro 主题文件夹?我已经尝试过
format
pathGlobFilter
但是当它进入avro文件夹时,它会过滤掉所有avro文件然后得到一个
empty
文件夹再次抛出异常。

请注意名称中没有模式可用于区分包含 avro 和 parquet 的子文件夹。下面图解说明

说得够多了,代码片段,描述了问题。检查最后一行,尤其是我在这里寻求帮助的

try catch

top_level_folder_path= f"abfss://{sourcecontainer}@{datalakename}.dfs.core.windows.net/toplevelfolder"

for datahub_domain in dbutils.fs.ls(top_level_folder_path):
    for datahub_topic in topicsearchpath:
        ....derive some variables here.....
        ...............................
        # some config
         cloudfile = {
                    "cloudFiles.format": "parquet",
                    "cloudFiles.includeExistingFiles": "true",
                    "cloudFiles.inferColumnTypes": "true",
                    "cloudFiles.schemaLocation": f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/autoloader/schemas/{originaltopicname}/",
                    "cloudFiles.schemaEvolutionMode": "addNewColumns",
                    "cloudFiles.allowOverwrites": "true",
                    "ignoreCorruptFiles": "true",
                    "ignoreMissingFiles": "true",
                }
                try:
                    df = (
                        spark.readStream.format("cloudFiles")
                        .options(**cloudfile)
                        .load(datahub_topic.path)
                    )
                    dstreamQuery = (
                        df.writeStream.format("delta")
                        .outputMode("append")
                        .queryName(f"{schema_name}_raw_{table_name}")
                        .option(
                            "checkpointLocation",
                            f"abfss://raw@{datalakename}.dfs.core.windows.net/autoloader/checkpoint/{originaldomainname}/{originaltopicname}/",
                        )
                        .option("mergeSchema", "true")
                        .partitionBy("Year", "Month", "Day")
                        .trigger(availableNow=True)
                        .start(
                            f"abfss://raw@{datalakename}.dfs.core.windows.net/{originaldomainname}/delta/{originaltopicname}"
                        )
                    )
                    while len(spark.streams.active) > 0:
                        spark.streams.awaitAnyTermination()
                except Exception as e: # I DO NOT WANT TO DO THIS
                    logger.warning(f"Error reading stream: {str(e)}") #### VERY BAD
apache-spark databricks spark-streaming
© www.soinside.com 2019 - 2024. All rights reserved.