为什么即使分支操作符返回其任务 ID,此 DAG 也会跳过任务?

问题描述 投票:0回答:1

我在 Airflow 中有以下 DAG:

    validate_and_prepare_config >> skip_detect_task >> [ingest, detect]
    detect >> export
    ingest >> skip_decrypt_task >> [decrypt, parse]
    decrypt >> parse >> vault_transfer >> skip_export_task >> [export, get_pipeline_state]
    export >> get_pipeline_state >> post_process

在 Airflow UI 中看起来像这样:

期望的行为是从

vault-transfer
分支到任一

1.

export
,然后
get_pipeline_state

2.直接到

get_pipeline_state

skip_export
任务包含一个分支运算符,它返回“
export
”或“
get_pipeline_state
”。当它返回“
get_pipeline_state
”时,它会按预期分支到
get_pipeline_state
任务。然而,当它返回“
export
”时,DAG仍然跳过
export
任务,仍然分支到
get_pipeline_state

这是我在

skip_export
任务的日志中看到的内容:

{python.py:177} INFO - Done. Returned value was: export
{python.py:211} INFO - Branch callable return export
{skipmixin.py:155} INFO - Following branch export
{skipmixin.py:211} INFO - Skipping tasks []

这些是我在

skip_export
任务中看到的 XCOM 值:

价值
返回值 出口
跳过mixin_key {'已关注': ['get_pipeline_state', '导出']}

为什么会发生这种情况以及如何让

skip_export
任务按预期运行?

python airflow etl directed-acyclic-graphs
1个回答
0
投票

这是因为默认的触发规则设置为all_success,这意味着只有当其所有前置任务都成功时才会执行该任务。

您的案例如下:

exportdetectskip_export 的下游。 export任务将显示为已跳过,因为其trigger_rule默认设置为all_success,并且由分支操作引起的跳过(在detect上)向下级联以跳过标记为all_success的任务。

在导出任务上添加

trigger_rule=none_failed_min_one_success
以使其正常工作。

老实说,我不确定你的

get_pipeline_state
任务 - 理论上它应该有同样的问题,但从图片来看似乎工作正常。为了确定起见,您可以仔细检查有或没有调整触发规则的情况。

© www.soinside.com 2019 - 2024. All rights reserved.