Airflow - 如何定义 DummyOperators 和任务组之间的任务依赖关系 - 使用装饰器语法

问题描述 投票:0回答:1

我正在浏览 Airflow 文档和来自 https://docs.astronomer.io/learn/managing-dependency?tab=taskflow#dependency-in-dynamic-task-mapping 的文档,以尝试 Airflow 2 附带的新功能。目前,我正在测试任务依赖性,但我对如何使用任务组有点困惑。

我正在尝试创建一个简单的 DAG,其中我想将 TaskGroup 任务依赖项与组外部的任务相结合,如下例所示:

from datetime import datetime

from airflow.decorators import dag, task_group
from airflow.models.baseoperator import chain
from airflow.models.dag import DagContext
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator


@dag(
    start_date=datetime(2021, 4, 20, 15, 0),
    concurrency=5,
    max_active_runs=1,
    catchup=False,
    schedule_interval="0 17 * * *"
)
def sv_load():
    start = DummyOperator(task_id="start")
    end = DummyOperator(task_id="end")

    @task_group(group_id="tg_test")
    def tg1():
        t1 = DummyOperator(task_id="task1")
        t2 = DummyOperator(task_id="task2")

        t1 >> t2


    start >> tg1() >> end

dag = sv_load()

但是这段代码给了我一个错误:

AttributeError:“NoneType”对象没有属性“update_relative”

这段代码有什么问题? 如何将任务组中的任务与组外的任务之间的任务依赖关系结合起来? 我明确想要使用任务装饰器(TaskFlow API)语法。

python airflow task airflow-2.x airflow-taskflow
1个回答
0
投票

没有使用

@task_group
装饰器。您可以使用
TaskGroup
类。这是一个例子:

from datetime import datetime

from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup


@dag(
    start_date=datetime(2021, 4, 20, 15, 0),
    concurrency=5,
    max_active_runs=1,
    catchup=False,
    schedule_interval='0 17 * * *'
)
def sv_load():
    start = DummyOperator(task_id='start')
    end = DummyOperator(task_id='end')

    with TaskGroup('tg_test', tooltip='Tasks for inner_section2') as tg_test:
        t1 = DummyOperator(task_id='task1')
        t2 = DummyOperator(task_id='task2')

        t1 >> t2

    start >> tg_test >> end


dag = sv_load()
© www.soinside.com 2019 - 2024. All rights reserved.