我目前正在使用Airflow和Celery来处理文件。工作人员需要下载文件,处理它们并在之后重新上传。我的DAG很好,只有一名工人。但是,当我添加一件事情变得复杂。
工作人员在可用时执行任务。 Worker1可以执行“处理下载文件”的任务,但是执行“下载文件”任务的是Worker2,因此任务失败,因为它无法处理不存在的文件。
有没有办法指定工作者(或调度程序)必须只在一个工作程序上运行DAG?我知道队列。但我已经在使用它们了。
在这种情况下,您可以使用气流变量来保存所有工作节点名称。例如:
worker_list
boxA, boxB, boxC
运行Airflow工作线程时,您可以指定多个作业队列。例如:airflow worker job_queue1,job_queue2
对于你的情况,我会运行airflow worker af_<hostname>
在您的DAG代码中,只需要获取该worker_list Airflow变量,随机选择一个框,然后将所有作业排队到af_<random_selected_box>
队列