感谢您阅读这个问题。
我按照官方说明此处完成了设置 Airflow 集群,并成功添加了托管在远程计算机上的工作人员。似乎一切都工作正常(与 Redis 和 Postgre 的连接正常工作,并且 DAG 任务在不同的工作人员之间正确分配和执行)。
此外,我还可以通过将每个工作线程订阅到一个独占队列并对 DAG 中每个运算符的
queue
参数进行硬编码来在特定工作线程上执行 DAG。
我的问题是我希望能够在“触发 DAG”按钮的帮助下通过 Airflow CLI 参数化所述执行队列。我尝试使用 Jinja Templates 和 XComs,但这些选项并没有帮助我解决问题,因为 Jinja Templates 似乎不适用于 Operators 的参数,而 XCom 需要
ti
参数或 Jinja Templates。
我知道有些人编写了插件来简化此任务,但由于我找到的所有信息都是在 Airflow 2.x 之前,我想知道是否已经有针对此问题的解决方案。
提前非常感谢您
但是,
task_test = BashOperator(
task_id='test2',
queue='{{ dag_run.conf['queue'] }}',
bash_command="echo hi",
)
不起作用,因为作业在
{{ dag_run.conf['queue'] }}
而不是 queue1
排队
我也尝试了以下方法,但也不起作用,因为 作业被安排在
default
队列上:
with DAG(
'queue_execution_test',
default_args=default_args,
description='Test specific execution.',
schedule_interval=None, # Para que solo se ejecute on demand
start_date=days_ago(2),
) as dag:
run_on_queue = 'default' #'{{ dag_run.conf["queue"] if dag_run else "default" }}'
def parse_queue_parameter(ti, **kwargs): # Task instance for XCom and kwargs for dagrun parameters
try:
ti.xcom_push(key='custom_queue', value= kwargs['dag_run'].conf['queue'] )
except:
print("No queue defined")
initial_task = PythonOperator(
task_id='test1',
queue=run_on_queue,
provide_context=True,
python_callable=parse_queue_parameter,
)
task_test = BashOperator(
task_id='test2',
queue=run_on_queue,
bash_command="echo hi",
)
initial_task.set_downstream(task_test)
@mathias:我也有相同的用例。请告诉我是否有解决方案。