气流 - 如何让工人完成所有dag运行任务?

问题描述 投票:1回答:1

我目前正在使用Airflow和Celery来处理文件。工作人员需要下载文件,处理它们并在之后重新上传。我的DAG很好,只有一名工人。但是,当我添加一件事情变得复杂。

工作人员在可用时执行任务。 Worker1可以执行“处理下载文件”的任务,但是执行“下载文件”任务的是Worker2,因此任务失败,因为它无法处理不存在的文件。

有没有办法指定工作者(或调度程序)必须只在一个工作程序上运行DAG?我知道队列。但我已经在使用它们了。

celery airflow
1个回答
0
投票

在这种情况下,您可以使用气流变量来保存所有工作节点名称。例如:

  • 变量:worker_list
  • 价值:boxA, boxB, boxC

运行Airflow工作线程时,您可以指定多个作业队列。例如:airflow worker job_queue1,job_queue2对于你的情况,我会运行airflow worker af_<hostname>

在您的DAG代码中,只需要获取该worker_list Airflow变量,随机选择一个框,然后将所有作业排队到af_<random_selected_box>队列

© www.soinside.com 2019 - 2024. All rights reserved.