IgnoreMissingFiles 选项在某些情况下不起作用

问题描述 投票:0回答:1

我遇到了一个奇怪的问题,但我似乎找不到原因。

目标: 使用 databricks 实验室的 dbx 时,在 Spark 结构化流中具有

spark.sql.files.ignoreMissingFiles
(本质上是 Spark 代码)。我有一些文件已收到通知,但现在已从存储中丢失。这可以通过

在数据块中轻松模拟
  1. 首先运行自动加载作业(在批处理模式下,availableNow = true)
  2. 单击外部位置(流作业所依赖的)上的“测试连接”按钮。
  3. 再次运行自动加载器

如果没有ignoreMissingFiles,它会因为缺少一些“validate_credential”文件而失败。

我可以通过设置ignoreMissingFiles的数据源选项或

spark.sql.files.ignoreMissingFiles
的集群配置来让它在笔记本中工作。

例如,这有效:

(
    spark.readStream
    .format("cloudFiles")
    .options(**cloudFilesOptions)
    .option("ignoreMissingFiles",True)
    .schema(input_schema)
    .load(source_path)
    .writeStream
    .trigger(availableNow=True)
    .option("checkpointLocation", checkpoint_path)
    .toTable("test_autoloader2")
)

此外,以下两种解决方法也有效

# inside the notebook
spark.conf.set("spark.sql.files.ignoreMissingFiles", "true")

或者在集群的spark配置中设置。

但是,当使用 DBX 时,同样不起作用。我发现它仅适用于以

cloudfiles
开头的选项,但不适用于其他选项。这意味着它不能与普通的 Spark 配置一起使用,而只能与 databricks 自动加载器特定选项一起使用。

例如这不起作用(单击测试连接按钮后,因为它仍然抱怨丢失文件)

def _process_timeseries(
        self,
    ) -> None:
        """Ingests the measurements data into the bronze (raw) layer."""
        # Read the input data
        self.spark.sparkContext._conf.set("spark.sql.files.ignoreMissingFiles","true")

        df = (
            self.spark.readStream.format("cloudFiles")
            .schema(ext_schema)
            .option("cloudFiles.format", "json")
            .option(
                "cloudFiles.schemaLocation",
                f"{self.checkpoint_path}/{self.catalog_name}/etl_bronze_timeseries{self.postfix}",
            )
            .option("cloudFiles.useNotifications", "true")
            .option("cloudFiles.subscriptionId", self.sub_id)
            .option("cloudFiles.tenantId", self.tenant_id)
            .option("cloudFiles.resourceGroup", self.rg)
            .option(
                "cloudFiles.clientSecret",
                self.dbutils.secrets.get(scope="keyvault_secrets", key="databricks-autoloader-clientsecret"),
            )
            .option("cloudFiles.clientId", self.client_id)
            .option("cloudFiles.backfillInterval", "1 day")
            .option("ignoreMissingFiles", true)
            .load(f"{self.input_path}/")

        )

此方法(上面定义)是从 DBX 中定义的 Task 类派生的类的一部分,即该类及其 init 方法

class ETLBronzeTimeseries(Task):
    def __init__(self, spark=None, source_container=None, postfix=""):
    super().__init__(spark)
    self.input_path = someval
    self.postfix = someval

    self.checkpoint_path = self.spark.conf.get("checkpoint_path")
    ....other stuffs

super().__init__(spark)
是返回sparkSession的地方(在dbx中定义),在
Task
类中使用

@staticmethod
def _prepare_spark(spark) -> SparkSession:
    if not spark:
        return SparkSession.builder.getOrCreate()
    else:
        return spark

我想知道 Spark 会话是否以某种方式没有更新,所以我也尝试将其更改为

return SparkSession.builder.config("spark.sql.files.ignoreMissingFiles","true").getOrCreate()

但这也不起作用。

我还尝试更改作业使用的集群来在那里设置配置,但这不起作用,我真的不明白为什么,因为应该读取集群配置。

我怀疑 SparkSession 没有得到更新,而且我在其他地方也没有更新,我注意到可能需要停止关联的 Spark 上下文,然后重新创建,但问题是:我更改了 SparkSession 创建(上面提到的)以采用该方法考虑配置。

我错过了什么?

我不断收到的错误是:

com.databricks.sql.io.FileReadException: Error while reading file abfss:[email protected]/validate_credential_2023-09-20-06-44-32. [CLOUD_FILE_SOURCE_FILE_NOT_FOUND] A file notification was received for file: abfss://[email protected]/validate_credential_2023-09-20-06-44-32 but it does not exist anymore. Please ensure that files are not deleted before they are processed. To continue your stream, you can set the Spark SQL configuration spark.sql.files.ignoreMissingFiles to true.

当然我不想使用笔记本来编写生产级代码。

DBx:这里

有趣的是: 当设置 .option("ignoreMissingFiles", true) 并转到 Spark UI 时,对于失败的阶段,我可以看到数据帧属性为(但它无法识别它)。

apache-spark databricks azure-databricks databricks-autoloader databricks-dbx
1个回答
0
投票

我刚刚花了整个早上的时间来思考这个问题,我想我已经弄清楚了。这些“validate_credential_xx”文件出现的原因是,当您向其配置外部位置对象时,Unity Catalog 似乎使用此文件来验证它是否可以访问存储容器。验证通过后,会再次删除该文件。

当您配置了 Databricks Autoloader 时,它将部署一个事件网格 + 存储队列来监视存储容器中的任何传入文件。那么发生的事情是:

  1. Unity Catalog 在容器中创建一个文件以验证它是否可以访问它
  2. 事件网格触发一个名为“validate_credential_xx”的文件已创建的事件
  3. 消息最终进入存储队列,由 Databricks Autoloader 处理。
  4. Unity Catalog 从容器中删除文件“validate_credential_xxx”

现在,当您使用 Databricks Autoloader 启动作业时,它将在队列中查找要处理的文件。它发现一条消息说它需要加载“validate_credential_xxx”,但它在容器中找不到该文件。这会导致您看到的错误。

我通过向事件网格订阅添加一个额外的过滤器解决了这个问题,其中“主题”可能不以“/blobServices/default/containers/“您的容器”/blob/validate_credential_”开头。这可以防止将消息发送到此类文件的队列。

请注意,在初始 Databricks 失败后,该项目将从队列中删除,但仍存储在检查点位置的 RocksDB 中。这解释了为什么错误从队列中删除后不断弹出。

© www.soinside.com 2019 - 2024. All rights reserved.