使用当前配置,每 1 小时我们就会得到一个包含新数据的新文件夹。
我正在利用文件通知,并且我不想切换到目录列表。但是,我遇到了最新文件夹中不断更新的 CSV 文件的问题。当自动加载器尝试读取当时正在更新的 CSV 文件时,这会导致作业失败。我正在探索排除最新文件夹被读取的方法,并遇到了 moddedBefore 参数,但我不确定它与 FileNotification 的兼容性。
modifiedBefore
是自动加载器中的通用选项,可与文件通知模式一起使用。
您提到文件每小时到达一次,并且最新文件更新非常频繁,当您使用自动加载器进行增量加载时会导致错误。
为了避免这种情况,您可以提供一个路径,该路径的模式与除最新文件之外的所有文件相匹配,或者使用
modifiedBefore
选项。
在这两种情况下,您都应该知道时间戳。
例如,如果您不需要
13:00:00
之后的数据,则可以使用如下模式:
"/2023-12-20T(0[0-9]|1[0-2]).[0-9][0-9].[0-9][0-9]Z/"
有关模式的更多信息,您可以参考此文档。
或者,您可以使用
modifiedBefore
选项:
autoloader_config = {
"cloudFiles.format":"csv",
"cloudFiles.useNotifications": "true",
"cloudFiles.resourceGroup":resourcegroup,
"cloudFiles.clientId": client_id,
"cloudFiles.clientSecret": client_secret,
"cloudFiles.tenantId": tenant_id,
"cloudFiles.connectionString":conn_string,
"cloudFiles.subscriptionId": subscription_id,
"cloudFiles.schemaLocation":schema_location,
"header": True,
"modifiedBefore":"2023-12-20 13:00:00.000000 UTC+5:30"
}
如果您想根据最后一小时进行过滤,可以使用以下代码来实现:
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
india_dt = datetime.now(tz=ZoneInfo("Asia/Kolkata"))
filter = india_dt - timedelta(hours=1)
这给出:
2023-12-20 11:41:39.862054+05:30
然后您可以在
modifiedBefore
选项中使用该过滤器。
注意:您需要指定与每小时创建的文件夹名称匹配的区域。