Airflow 中的任务分组

问题描述 投票:0回答:1

我有一个气流 DAG,具有依赖项:start_task >> group1,start_task >> group2。

任务位于“任务”字典中,用于填写 DAG 中的任务组

from airflow import DAG
from datetime import datetime
from airflow.operators.empty import EmptyOperator
from python.etl.dag_param_generator import DAGParamGenerator
from airflow.utils.task_group import TaskGroup

dag_param_generator = DAGParamGenerator('TEST_DINAMIC_DAG')
tasks = {
    "task_1": EmptyOperator(task_id="task_1"),
    "task_2": EmptyOperator(task_id="task_2"),
    "task_3": EmptyOperator(task_id="task_3"),
    "task_4": EmptyOperator(task_id="task_4")
}

with DAG(
    dag_id=dag_param_generator.get_dag_params('dag_id'),
    schedule=dag_param_generator.get_dag_params('schedule'),
    start_date=dag_param_generator.get_dag_params('start_date'),
    catchup=dag_param_generator.get_dag_params('catchup')
) as dag:
    start_task = EmptyOperator(task_id="start_task")

    with TaskGroup("group1") as group1:
        for key, value in tasks.items():
            if key in ['task_1', 'task_2']:
                print(value)
                globals()[key] = value
    with TaskGroup("group2") as group2:
        for key, value in tasks.items():
            if key in ['task_3', 'task_4']:
                print(value)
                globals()[key] = value
                
    start_task >> group1
    start_task >> group2

任务不适合分组,我得到以下结果(组为空):

如何解决这个问题?

python python-3.x airflow
1个回答
0
投票

我不知道你所说的

globals()
是什么意思以及为什么你要双重循环。相反,请将任务添加到组中。

with TaskGroup("group1") as group1:
    for key in ['task_1', 'task_2']:
        group1.append(tasks[key])
with TaskGroup("group2") as group2:
    for key in ['task_3', 'task_4']:
        group1.append(tasks[key])
© www.soinside.com 2019 - 2024. All rights reserved.