我们如何使用TriggerDagRunOperator触发多个气流dags?

问题描述 投票:0回答:6

我有一个场景,其中特定的 dag 完成后需要触发多个 dag,已使用 TriggerDagRunOperator 触发单个 dag,是否可以将多个 dag 传递给 TriggerDagRunOperator 来触发多个 dags?

是否可以仅在成功完成当前 dag 时触发。

directed-acyclic-graphs airflow
6个回答
15
投票

我也遇到过同样的问题。没有现成的解决方案,但我们可以为其编写一个自定义运算符。

这里是自定义运算符的代码,它获取

python_callable
trigger_dag_id
作为参数:

class TriggerMultiDagRunOperator(TriggerDagRunOperator):

    @apply_defaults
    def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs):
        super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs)
        self.op_args = op_args or []
        self.op_kwargs = op_kwargs or {}

    def execute(self, context):
        session = settings.Session()
        created = False
        for dro in self.python_callable(context, *self.op_args, **self.op_kwargs):
            if not dro or not isinstance(dro, DagRunOrder):
                break

            if dro.run_id is None:
                dro.run_id = 'trig__' + datetime.utcnow().isoformat()

            dbag = DagBag(settings.DAGS_FOLDER)
            trigger_dag = dbag.get_dag(self.trigger_dag_id)
            dr = trigger_dag.create_dagrun(
                run_id=dro.run_id,
                state=State.RUNNING,
                conf=dro.payload,
                external_trigger=True
            )
            created = True
            self.log.info("Creating DagRun %s", dr)

        if created is True:
            session.commit()
        else:
            self.log.info("No DagRun created")
        session.close()

trigger_dag_id
是我们想要多次运行的 dag id。

python_callable
是一个函数,它应该返回一组
DagRunOrder
对象,一个对象用于调度具有 dag_id
trigger_dag_id
的 DAG 实例。

GitHub 上的代码和示例:https://github.com/mastak/airflow_multi_dagrun 关于此代码的更多描述:https://medium.com/@igorlubimov/dynamic-scheduling-in-airflow-52979b3e6b13


2
投票

在 Airflow 2 中,您可以进行动态任务映射。例如:

import uuid
import random
from airflow.decorators import dag, task
from airflow.operators.trigger_dagrun import TriggerDagRunOperator


dag_args = {
    "start_date": datetime(2022, 9, 9),
    "schedule_interval": None,
    "catchup": False,
}

@task
def define_runs():
    num_runs = random.randint(3, 5)
    runs = [str(uuid.uuid4()) for _ in range(num_runs)]
    return runs


@dag(**dag_args)
def dynamic_tasks():

    runs = define_runs()
    run_dags = TriggerDagRunOperator.partial(
        task_id="run_dags",
        trigger_dag_id="hello_world",
        conf=None,
    ).expand(
        trigger_run_id=runs,
    )

    run_dags

dag = dynamic_tasks()

文档这里


1
投票

你可以尝试循环播放!例如:

for i in list:

trigger_dag =TriggerDagRunOperator(task_id='trigger_'+ i, 
                                trigger_dag_id=i,
                                python_callable=conditionally_trigger_non_indr,
                                dag=dag)

根据所需任务进行设置。我已经为 PythonOperator 实现了类似的自动化操作。如果这对你有用的话你可以尝试一下!


0
投票

API 文档 所述,该方法接受单个 dag_id。但是,如果您想在完成后无条件启动下游 DAG,为什么不将这些任务放在单个 DAG 中并在那里设置依赖项/工作流程呢?然后您就可以在适当的地方设置

depends_on_past=True

编辑:如果您绝对需要在单独的 DAG 中使用它们,那么简单的解决方法是创建多个 TriggerDagRunOperator 并将它们的依赖项设置为同一任务。


0
投票

扩展https://stackoverflow.com/users/14647868/matias-lopez回复。如果您需要动态工资:

例如:

run_dags = TriggerDagRunOperator.partial(
    task_id='test_07_few_opt_ins_triggered_dag',
    trigger_dag_id='test_07_few_opt_ins_triggered_dag',
).expand(
    conf=[{"line": "1"}, {"line": "2"}, {"line": "3"}]
)

上面我们有 3 次运行,我们需要设置

expand
用相同数量的“运行”填充conf。

然后,在触发的DAG中:

@task
def start(dag_run=None):
    print(f"consuming line {dag_run.conf.get('line')}")

start()

0
投票

@dag(**dag_args) defdynamic_tasks():

runs = define_runs()
run_dags = TriggerDagRunOperator.partial(
    task_id="run_dags",
    trigger_dag_id="hello_world",
    conf=None,
).expand(
    trigger_run_id=runs,
)

我们可以通过 xcom_pull 来获取扩展函数中上一个任务的运行吗(trigger_run_id = {{ti.xcom_pull(task_ids='t3', key='return_value')}})

© www.soinside.com 2019 - 2024. All rights reserved.