Autolaoder - 带有修改前的文件通知

问题描述 投票:0回答:1

使用当前配置,每 1 小时我们就会得到一个包含新数据的新文件夹。

See image here.

我正在利用文件通知,并且我不想切换到目录列表。但是,我遇到了最新文件夹中不断更新的 CSV 文件的问题。当自动加载器尝试读取当时正在更新的 CSV 文件时,这会导致作业失败。我正在探索排除最新文件夹被读取的方法,并遇到了 moddedBefore 参数,但我不确定它与 FileNotification 的兼容性。

pyspark streaming azure-databricks databricks-autoloader
1个回答
0
投票

modifiedBefore
是自动加载器中的通用选项,可与文件通知模式一起使用。

您提到文件每小时到达一次,并且最新文件更新非常频繁,当您使用自动加载器进行增量加载时会导致错误。

为了避免这种情况,您可以提供一个路径,该路径的模式与除最新文件之外的所有文件相匹配,或者使用

modifiedBefore
选项。

在这两种情况下,您都应该知道时间戳。

例如,如果您不需要

13:00:00
之后的数据,则可以使用如下模式:

"/2023-12-20T(0[0-9]|1[0-2]).[0-9][0-9].[0-9][0-9]Z/"

有关模式的更多信息,您可以参考此文档

或者,您可以使用

modifiedBefore
选项:

autoloader_config = {
"cloudFiles.format":"csv",
"cloudFiles.useNotifications": "true",
"cloudFiles.resourceGroup":resourcegroup,
"cloudFiles.clientId": client_id,
"cloudFiles.clientSecret": client_secret,
"cloudFiles.tenantId": tenant_id,
"cloudFiles.connectionString":conn_string,
"cloudFiles.subscriptionId": subscription_id,
"cloudFiles.schemaLocation":schema_location,
"header": True,
"modifiedBefore":"2023-12-20 13:00:00.000000 UTC+5:30"
}

如果您想根据最后一小时进行过滤,可以使用以下代码来实现:

from datetime import datetime, timedelta
from zoneinfo import ZoneInfo

india_dt = datetime.now(tz=ZoneInfo("Asia/Kolkata"))
filter  = india_dt -  timedelta(hours=1)

这给出:

2023-12-20 11:41:39.862054+05:30

然后您可以在

modifiedBefore
选项中使用该过滤器。

注意:您需要指定与每小时创建的文件夹名称匹配的区域。

© www.soinside.com 2019 - 2024. All rights reserved.