如何在气流中创建具有并行任务的多个任务组

问题描述 投票:0回答:1

请参考下面的代码片段。我正在尝试按照图 1 创建 DAG,但是我的代码正在创建如图 2 所示的 DAG。请告知。

代码:

task_list = [1, 2, 3, 4, 5, 6, 7]
parallel_task_per_grp = 3
task_grp_cnt = math.ceil(len(task_list)/parallel_task_per_grp)
task_grp_names = [f"grp_{tgc}" for tgc in range(task_grp_cnt)]

with DAG(
        dag_id=dag_name_id,
        default_args=default_args,
        start_date=start_date,
        schedule_interval=None,    # mi hr dy mn wd - as per cron
        tags=["adhoc_dag_ing_conf_sys_profile_col_upd"],
        catchup=False
) as dag:
    start = BashOperator(task_id="start", bash_command='echo "starting batch processing"', do_xcom_push=False)
    stop = BashOperator(task_id="stop", bash_command='echo "stopping batch processing"', trigger_rule=TriggerRule.NONE_SKIPPED, do_xcom_push=False)

    start_pos, stop_pos = 0, parallel_task_per_grp
    task_group = []
    for task_grp_name in task_grp_names:
        with TaskGroup(group_id=task_grp_name) as tg1:
            [DummyOperator(task_id=f"task_{tl}") for tl in task_list[start_pos:stop_pos]]
        task_group.append(tg1)
        start_pos, stop_pos = start_pos+parallel_task_per_grp, stop_pos+parallel_task_per_grp

    start >> task_group >> stop

图1

图2

google-bigquery airflow airflow-2.x
1个回答
0
投票
start_pos, stop_pos = 0, parallel_task_per_grp
    task_group = []
    for task_grp_name in task_grp_names:
        with TaskGroup(group_id=task_grp_name) as tg1:
            [DummyOperator(task_id=f"task_{tl}") for tl in task_list[start_pos:stop_pos]]
        task_group.append(tg1)
        start_pos, stop_pos = start_pos+parallel_task_per_grp, stop_pos+parallel_task_per_grp

    chain(*task_group)

    start_processing >> task_group[0]
    task_group[-1] >> stop_processing
© www.soinside.com 2019 - 2024. All rights reserved.