气流传感器不完整文件列表

问题描述 投票:0回答:2

我真的很想知道,当文件未完全复制/加载到目标数据存储时,气流传感器如何工作。举个例子:我们有一个文件系统,传感器检查其中的文件。我们正在将一个大文件复制到文件夹中,这确实需要一些时间。气流传感器会消耗不完整的文件还是会等待文件完全加载?

我真的在寻找答案,但没有找到类似的东西

python airflow etl airflow-taskflow
2个回答
0
投票

你可以在FileSensor的代码中看到答案。

答案是操作系统负责回答。例如,在我下载大文件“Mac”时进行的测试中,在下载文件时返回 True

这个想法是,当你复制一个大文件时,给它一个临时扩展名,并在复制完成后将其替换为真实的扩展名

 def poke(self, context: Context):
    hook = FSHook(self.fs_conn_id)
    basepath = hook.get_path()
    full_path = os.path.join(basepath, self.filepath)
    self.log.info("Poking for file %s", full_path)

    for path in glob(full_path, recursive=self.recursive):
        if os.path.isfile(path):
            mod_time = datetime.datetime.fromtimestamp(os.path.getmtime(path)).strftime("%Y%m%d%H%M%S")
            self.log.info("Found File %s last modified: %s", str(path), mod_time)
            return True

        for _, _, files in os.walk(path):
            if len(files) > 0:
                return True
    return False

0
投票

其他解决方案是实现一个

NewFileSensor
类,类似于原始
FileSensor
,该
poke()
方法不仅会测试该文件是否存在,而且会测试其大小在一定时间内保持不变(10, 60秒).. 如果文件仍在上传,那么它的大小也会逐渐改变,传感器会检测到这一点。也许检查更新时间戳也可以..

如果上传不完整,这当然没有帮助。然后您的上传应用程序应该以某种方式控制它,并在上传后将文件从 *.tmp 重命名为 *.ext,或者创建第二个空标志文件,如 filename.done。可以用标准检测

FileSensor

© www.soinside.com 2019 - 2024. All rights reserved.