请参考下面的代码片段。我正在尝试按照图 1 创建 DAG,但是我的代码正在创建如图 2 所示的 DAG。请告知。
代码:
task_list = [1, 2, 3, 4, 5, 6, 7]
parallel_task_per_grp = 3
task_grp_cnt = math.ceil(len(task_list)/parallel_task_per_grp)
task_grp_names = [f"grp_{tgc}" for tgc in range(task_grp_cnt)]
with DAG(
dag_id=dag_name_id,
default_args=default_args,
start_date=start_date,
schedule_interval=None, # mi hr dy mn wd - as per cron
tags=["adhoc_dag_ing_conf_sys_profile_col_upd"],
catchup=False
) as dag:
start = BashOperator(task_id="start", bash_command='echo "starting batch processing"', do_xcom_push=False)
stop = BashOperator(task_id="stop", bash_command='echo "stopping batch processing"', trigger_rule=TriggerRule.NONE_SKIPPED, do_xcom_push=False)
start_pos, stop_pos = 0, parallel_task_per_grp
task_group = []
for task_grp_name in task_grp_names:
with TaskGroup(group_id=task_grp_name) as tg1:
[DummyOperator(task_id=f"task_{tl}") for tl in task_list[start_pos:stop_pos]]
task_group.append(tg1)
start_pos, stop_pos = start_pos+parallel_task_per_grp, stop_pos+parallel_task_per_grp
start >> task_group >> stop
图1
图2
start_pos, stop_pos = 0, parallel_task_per_grp
task_group = []
for task_grp_name in task_grp_names:
with TaskGroup(group_id=task_grp_name) as tg1:
[DummyOperator(task_id=f"task_{tl}") for tl in task_list[start_pos:stop_pos]]
task_group.append(tg1)
start_pos, stop_pos = start_pos+parallel_task_per_grp, stop_pos+parallel_task_per_grp
chain(*task_group)
start_processing >> task_group[0]
task_group[-1] >> stop_processing