includeExistingFiles: false 在 Databricks Autoloader 中不起作用

问题描述 投票:0回答:2

使用自动加载器从 adls gen2 获取文件。但是,我只想摄取新文件。使用以下配置仍然无法阻止现有文件被摄取。还有其他人面临同样的问题吗?

"cloudFiles.format": "json",
"multiline": True,
"pathGlobFilter": f"*.json",
"includeExistingFiles": "false",
"cloudFiles.schemaLocation": inferred_schema_folder,
"cloudFiles.schemaEvolutionMode": "addNewColumns",
"cloudFiles.inferColumnTypes": "true",
"cloudFiles.rescuedDataColumn": "unexpected_data",
"cloudFiles.maxFilesPerTrigger": 10000,

尝试切换到文件通知模式而不是目录列表。但这也不起作用

"cloudFiles.useNotifications": "true",
"cloudFiles.subscriptionId": config.AZURE_SUB_ID,
"cloudFiles.tenantId": config.AZURE_TENANT_ID,
"cloudFiles.clientId": config.AZURE_CLIENT_ID,
"cloudFiles.clientSecret": config.AZURE_CLIENT_SECRET,
"cloudFiles.resourceGroup": "Development",
apache-spark pyspark azure-databricks spark-structured-streaming
2个回答
0
投票

根据此文档

cloudFiles.includeExistingFiles

仅当您第一次启动流时才会评估

。改变 重新启动流后此选项无效。

如果在更改此选项时重新启动流,则不会影响流。因此,请清除所有检查点并使用新检查点开始新的流媒体。

这样做,您将获得正确的流数据。此外,您还有另一个选项,

modifiedAfter
,它读取在指定时间戳之后修改的文件。

在上面提供的链接中了解有关这些选项的更多信息。


0
投票

我已经清除了检查点和架构并重新启动了流。这样做了多次,但它一次又一次地读取旧的现有文件。

使用modifiedAfter有一个副作用,如果以后我想通过切换到目录列表模式并删除/更改modifiedAfter来获取旧数据,这些文件将永远不会被读取,除非我完全清除检查点。

© www.soinside.com 2019 - 2024. All rights reserved.