生成uuid并在Airflow DAG中使用它

问题描述 投票:0回答:1

我正在尝试创建具有以下2个任务的动态气流:任务1:创建具有生成的UUID作为其名称的一部分的文件任务2:对这些文件进行检查

所以我定义了一个变量'FILE_UUID'并将其设置如下:str(uuid.uuid4())。并且还创建了一个常量文件名:MY_FILE ='{file_uuid} _file.csv'.format(file_uuid = FILE_UUID}

然后 - 任务1是一个bashOperator,它将MY_FILE作为命令的一部分,并成功创建一个文件。我可以看到生成的文件包含名称中的特定UUID,

TASK 2失败是一个Python_perator,它将MY_FILE作为op_args。但无法访问该文件。日志显示它尝试访问具有不同UUID的文件。

为什么我的“常量”是在每项任务上单独运行?有没有办法防止这种情况发生?

我正在使用Airflow 1.10,我的执行者是LocalExecutor。

我尝试在“with DAG”之外设置常量,并在其中,也尝试使用宏,但PythonOperator只是使用宏字符串字面上使用它们保存的值。

constants uuid airflow
1个回答
0
投票

您必须记住,DAG定义文件是一种“配置脚本”,而不是运行DAG的实际可执行文件。任务在完全不同的环境中执行,大多数时候甚至不在同一台机器上。可以把它想象成一个配置XML来设置你的任务,然后它们在云中的其他机器上构建和执行 - 但它是Python而不是XML。

总而言之 - 您的DAG代码是Python,但它不是在您的任务的运行时中执行的代码。因此,如果您在那里生成随机uuid,它将在未知时间和多次进行评估 - 对于每个任务,在不同的机器上。

要使其在任务中保持一致,您需要找到另一种方法,例如:

  • 使用XCOM使得第一个任务使用它获得的uuid,然后将其写入XCOM以供所有下游任务使用。
  • 在你的管道,来源,日期或其他任何东西上锚定你的uuid(例如,如果这是一项日常任务,你可以通过日期部分混合一些dag /任务细节来构建你的uuid等等 - 无论什么会使你的uuid对于所有任务都是一样的,但对于独特的日子是唯一的
© www.soinside.com 2019 - 2024. All rights reserved.