我正在尝试创建具有以下2个任务的动态气流:任务1:创建具有生成的UUID作为其名称的一部分的文件任务2:对这些文件进行检查
所以我定义了一个变量'FILE_UUID'并将其设置如下:str(uuid.uuid4())。并且还创建了一个常量文件名:MY_FILE ='{file_uuid} _file.csv'.format(file_uuid = FILE_UUID}
然后 - 任务1是一个bashOperator,它将MY_FILE作为命令的一部分,并成功创建一个文件。我可以看到生成的文件包含名称中的特定UUID,
TASK 2失败是一个Python_perator,它将MY_FILE作为op_args。但无法访问该文件。日志显示它尝试访问具有不同UUID的文件。
为什么我的“常量”是在每项任务上单独运行?有没有办法防止这种情况发生?
我正在使用Airflow 1.10,我的执行者是LocalExecutor。
我尝试在“with DAG”之外设置常量,并在其中,也尝试使用宏,但PythonOperator只是使用宏字符串字面上使用它们保存的值。
您必须记住,DAG定义文件是一种“配置脚本”,而不是运行DAG的实际可执行文件。任务在完全不同的环境中执行,大多数时候甚至不在同一台机器上。可以把它想象成一个配置XML来设置你的任务,然后它们在云中的其他机器上构建和执行 - 但它是Python而不是XML。
总而言之 - 您的DAG代码是Python,但它不是在您的任务的运行时中执行的代码。因此,如果您在那里生成随机uuid,它将在未知时间和多次进行评估 - 对于每个任务,在不同的机器上。
要使其在任务中保持一致,您需要找到另一种方法,例如: