通过Airflow的UI TriggerDag参数指定DAG队列

问题描述 投票:0回答:1

感谢您阅读这个问题。

我按照官方说明此处完成了设置 Airflow 集群,并成功添加了托管在远程计算机上的工作人员。似乎一切都工作正常(与 Redis 和 Postgre 的连接正常工作,并且 DAG 任务在不同的工作人员之间正确分配和执行)。

此外,我还可以通过将每个工作线程订阅到一个独占队列并对 DAG 中每个运算符的

queue
参数进行硬编码来在特定工作线程上执行 DAG。

我的问题是我希望能够在“触发 DAG”按钮的帮助下通过 Airflow CLI 参数化所述执行队列。我尝试使用 Jinja Templates 和 XComs,但这些选项并没有帮助我解决问题,因为 Jinja Templates 似乎不适用于 Operators 的参数,而 XCom 需要

ti
参数或 Jinja Templates。

我知道有些人编写了插件来简化此任务,但由于我找到的所有信息都是在 Airflow 2.x 之前,我想知道是否已经有针对此问题的解决方案。

提前非常感谢您

编辑:我想这样做:Triggering DAG via UI

但是,

        task_test = BashOperator(
        task_id='test2',
        queue='{{ dag_run.conf['queue'] }}',
        bash_command="echo hi",
        )

不起作用,因为作业在

{{ dag_run.conf['queue'] }}
而不是
queue1

排队

我也尝试了以下方法,但也不起作用,因为 作业被安排在

default
队列上:

    with DAG(
        'queue_execution_test',
        default_args=default_args,
        description='Test specific execution.',
        schedule_interval=None, # Para que solo se ejecute on demand
        start_date=days_ago(2),
    ) as dag:

    run_on_queue = 'default' #'{{ dag_run.conf["queue"] if dag_run else "default" }}'

    def parse_queue_parameter(ti, **kwargs): # Task instance for XCom and kwargs for dagrun parameters
        try:
            ti.xcom_push(key='custom_queue', value= kwargs['dag_run'].conf['queue'] )
        except:
            print("No queue defined")

    initial_task = PythonOperator(
        task_id='test1',
        queue=run_on_queue,
        provide_context=True,
        python_callable=parse_queue_parameter,
    )


    task_test = BashOperator(
        task_id='test2',
        queue=run_on_queue,
        bash_command="echo hi",
    )

    initial_task.set_downstream(task_test)
celery airflow
1个回答
0
投票

@mathias:我也有相同的用例。请告诉我是否有解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.