我有一个与动态任务组一起运行的 dag,当动态任务数量发生变化时该组会失败

问题描述 投票:0回答:1

我有一个运行 8 个任务或 16 个任务的 dag(或进行测试,任何多对最多 8 对)。 当任务组结束时,下一个任务将评估先前任务的结果。 但是,当先前运行(成功)具有不同的任务数量时,由于未知原因,此操作会失败。 当任务少于之前的任务时,幽灵任务会在任务组中显示为失败(它们不是生成的或不需要的),或者当任务较多时,评估任务会失败,因为未满足所有依赖关系。 所以看起来这些依赖关系是从先前的任务中保留下来的。 如果我再次触发 dag,它会正常运行,但是当动态任务的计数发生变化时,它会再次出错。 我可以做什么来解决这个问题? Here is a graph of a failure after the number of tasks is increased from 2 to 4 错误任务没有日志,但任务实例详细信息有: 依赖原因 Dagrun 运行任务实例的 dagrun 不处于“正在运行”状态,而是处于“失败”状态。 任务实例状态 任务处于“失败”状态。 不知道它是如何进入“失败”状态的。 Here is a graph with ghost tasks 无论幽灵任务如何,这个特定的运行都是成功的,一定是我的调整:trigger_rule =“all_done”, 但我仍然不想看到不应该出现的任务的图像。 让我知道你们的想法。

马克

我在第一个任务组之后更改了任务以在 all_done 上触发,这似乎有效。 但当任务数量增加时仍然存在问题,并且仍然存在不应该出现在图表中的幽灵任务。

#这是第一组的生成方式: 将 TaskGroup("group_hadr_status") 作为 group_hadr_status: 上下文 = {} conn = db2connectionhook(database_conn_id="DB_DB2_DBA_DB",**上下文) servers_to_process = conn.execute("从 patchTuesday.servers order by 1 选择服务器") 德尔康恩 ROW_COUNT=0 对于servers_to_process中的服务器: ROW_COUNT+=1 服务器=服务器[0] 日志记录.info(“group_hadr_status服务器=%s”,服务器) 制作动态任务:“get_hadr_status_”+ get_hadr_status 组中的服务器 get_hadr_status_task=PythonOperator( task_id='get_hadr_status_' + 服务器, dag=星期二补丁, execution_timeout = timedelta(秒=300), #300 -> 5 分钟 python_callable=get_hadr_status, 重试= 0, op_args=[服务器], ) 最多游泳泳道 = ROW_COUNT/2 init_patch_tuesday_task.set_downstream(group_hadr_status)

#这是我定义orchestration_task任务的方式: patch_orchestration_task = PythonOperator( task_id='patch_orchestration_id', dag=星期二补丁, python_callable=patch_orchestration, 提供_上下文=真, 触发规则=“全部完成”, on_failure_callback=notify_email, op_args=[]) group_hadr_status.set_downstream(patch_orchestration_task) `

dynamic airflow task
1个回答
0
投票

查看动态任务映射。 Airflow 的动态任务映射允许定义可变数量的任务,这些任务在 DAG 实际运行之前是未知的。我非常喜欢 TaskFlow API(例如使用

@task
),但您也可以 映射“经典”运算符,例如示例中的
PythonOperator

© www.soinside.com 2019 - 2024. All rights reserved.