可以选择使用 Param 类将字符串变量传递到 Airflow DAG 中。
但是,我想传递整个文本,在我的例子中:yaml 作为文本。
我尝试使用 Param 类的数组/对象类型来完成此操作,但“数组”类型只是截断 yaml 文件的每一行,而对象类型需要 JSON 输入。
我无法在外部将 yaml 转换为 json,所以我唯一的选择是将 yaml 文件直接传递给 DAG 参数。
那么如何将 yaml 文件传递到 Airflow 文本字段?
一种解决方案是使用
Param
类来接收字符串。在内部,您必须在使用它之前将其转换为 YAML:
首先需要将DAG参数定义为字符串类型。
dag_params = {
'yaml_config': Param(
'',
type='string',
description='YAML configuration as a string'
)
}
您需要准备运算符(在我的例子中,
PythonOperator
)来执行从字符串到 YAML 的转换。
def parse_yaml(**kwargs):
yaml_text = kwargs['dag_run'].conf['yaml_config']
config = yaml.safe_load(yaml_text)
print("Config loaded from YAML:", config)
parse_config = PythonOperator(
task_id='parse_yaml',
python_callable=parse_yaml
)
最后,您将可以使用配置。