我正在浏览 Airflow 文档和来自 https://docs.astronomer.io/learn/managing-dependency?tab=taskflow#dependency-in-dynamic-task-mapping 的文档,以尝试 Airflow 2 附带的新功能。目前,我正在测试任务依赖性,但我对如何使用任务组有点困惑。
我正在尝试创建一个简单的 DAG,其中我想将 TaskGroup 任务依赖项与组外部的任务相结合,如下例所示:
from datetime import datetime
from airflow.decorators import dag, task_group
from airflow.models.baseoperator import chain
from airflow.models.dag import DagContext
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.operators.ssh import SSHOperator
@dag(
start_date=datetime(2021, 4, 20, 15, 0),
concurrency=5,
max_active_runs=1,
catchup=False,
schedule_interval="0 17 * * *"
)
def sv_load():
start = DummyOperator(task_id="start")
end = DummyOperator(task_id="end")
@task_group(group_id="tg_test")
def tg1():
t1 = DummyOperator(task_id="task1")
t2 = DummyOperator(task_id="task2")
t1 >> t2
start >> tg1() >> end
dag = sv_load()
但是这段代码给了我一个错误:
AttributeError:“NoneType”对象没有属性“update_relative”
这段代码有什么问题? 如何将任务组中的任务与组外的任务之间的任务依赖关系结合起来? 我明确想要使用任务装饰器(TaskFlow API)语法。
没有使用
@task_group
装饰器。您可以使用 TaskGroup
类。这是一个例子:
from datetime import datetime
from airflow.decorators import dag
from airflow.operators.dummy import DummyOperator
from airflow.utils.task_group import TaskGroup
@dag(
start_date=datetime(2021, 4, 20, 15, 0),
concurrency=5,
max_active_runs=1,
catchup=False,
schedule_interval='0 17 * * *'
)
def sv_load():
start = DummyOperator(task_id='start')
end = DummyOperator(task_id='end')
with TaskGroup('tg_test', tooltip='Tasks for inner_section2') as tg_test:
t1 = DummyOperator(task_id='task1')
t2 = DummyOperator(task_id='task2')
t1 >> t2
start >> tg_test >> end
dag = sv_load()