在 Airflow DAG 参数中使用文本字段

问题描述 投票:0回答:1

可以选择使用 Param 类将字符串变量传递到 Airflow DAG 中。

但是,我想传递整个文本,在我的例子中:yaml 作为文本。

我尝试使用 Param 类的数组/对象类型来完成此操作,但“数组”类型只是截断 yaml 文件的每一行,而对象类型需要 JSON 输入。

我无法在外部将 yaml 转换为 json,所以我唯一的选择是将 yaml 文件直接传递给 DAG 参数。

那么如何将 yaml 文件传递到 Airflow 文本字段?

python json yaml airflow
1个回答
0
投票

一种解决方案是使用

Param
类来接收字符串。在内部,您必须在使用它之前将其转换为 YAML:

  1. 首先需要将DAG参数定义为字符串类型。

     dag_params = {
     'yaml_config': Param(
         '',
         type='string',
         description='YAML configuration as a string'
     )
    }
    
  2. 您需要准备运算符(在我的例子中,

    PythonOperator
    )来执行从字符串到 YAML 的转换。

     def parse_yaml(**kwargs):
         yaml_text = kwargs['dag_run'].conf['yaml_config']
         config = yaml.safe_load(yaml_text)
         print("Config loaded from YAML:", config)
    
     parse_config = PythonOperator(
         task_id='parse_yaml',
         python_callable=parse_yaml
     )
    
  3. 最后,您将可以使用配置。

© www.soinside.com 2019 - 2024. All rights reserved.