我有一个气流 DAG,具有依赖项:start_task >> group1,start_task >> group2。
任务位于“任务”字典中,用于填写 DAG 中的任务组
from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from python.etl.dag_param_generator import DAGParamGenerator
from airflow.utils.task_group import TaskGroup
dag_param_generator = DAGParamGenerator('TEST_DINAMIC_DAG')
tasks = {
"task_1": EmptyOperator(task_id="task_1"),
"task_2": EmptyOperator(task_id="task_2"),
"task_3": EmptyOperator(task_id="task_3"),
"task_4": EmptyOperator(task_id="task_4")
}
with DAG(
dag_id=dag_param_generator.get_dag_params('dag_id'),
schedule=dag_param_generator.get_dag_params('schedule'),
start_date=dag_param_generator.get_dag_params('start_date'),
catchup=dag_param_generator.get_dag_params('catchup')
) as dag:
start_task = EmptyOperator(task_id="start_task")
with TaskGroup("group1") as group1:
for key, value in tasks.items():
if key in ['task_1', 'task_2']:
print(value)
globals()[key] = value
with TaskGroup("group2") as group2:
for key, value in tasks.items():
if key in ['task_3', 'task_4']:
print(value)
globals()[key] = value
start_task >> group1
start_task >> group2
如何解决这个问题?
我不知道你所说的
globals()
是什么意思以及为什么你要双重循环。相反,请将任务添加到组中。
with TaskGroup("group1") as group1:
for key in ['task_1', 'task_2']:
group1.append(tasks[key])
with TaskGroup("group2") as group2:
for key in ['task_3', 'task_4']:
group1.append(tasks[key])