气流调度程序失败

问题描述 投票:1回答:2

我已经按照this教程尝试使用我自己的DAG在localhost上构建气流集群。当我在配置文件中设置airflow scheduler后运行executor = CeleryExecutor时,我收到了以下回溯:

Traceback(最近一次调用最后一次):

在args.func(args)中输入文件“/ home / yurii / Tools / anaconda3 / bin / airflow”,第28行

在调度程序job.run()中输入文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py”,第839行

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第200行,运行self._execute()

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第1309行,在_execute self._execute_helper(processor_manager)中

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第1441行,在_execute_helper中self.executor.heartbeat()

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py”,第124行,心跳中的self.execute_async(key,command = command,queue = queue)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py”,第80行,在execute_async args = [command],queue = queue)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py”,第573行,在apply_async ** dict(self._get_exec_options(),** options)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py”,第354行,在send_task reply_to = reply_to或self.oid,** options

在pub_task ** kwargs中输入文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py”,第310行

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第172行,发布routing_key,强制,立即,交换,声明)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py”,第449行,在_ensured return fun(* args,** kwargs)

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第188行,在_publish mandatory = mandatory,immediate = immediate,

文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py”,第122行,basic_publish必填或False,立即或错误,

TypeError:需要一个整数(获取类型NoneType)

一些其他信息:

  • 我使用Airflow 1.8.0以及Celery 3.1.25和RabbitMQ 3.5.7作为经纪人和后端,但也尝试使用Ceflow 4.2的Airflow 1.9.0。
  • 具有顺序执行器的气流工作没有任何问题。
  • `airflow test“dag_name”“task_name”“exec_date”成功运行。

我是Airflow / Celery / RabbitMQ / SQL的新手,所以任何帮助都将不胜感激!

celery airflow-scheduler
2个回答
3
投票

您似乎使用librabbitmq作为amqp代理,而芹菜核心团队并不推荐这样做。使用py-amqp作为rabbitmq代理,你应该摆脱这个错误。


3
投票

添加到上一个答案。使用py-amqp涉及从broker_url = amqp://XXXXX更改为broker_url = pyamqp://XXXXXpip uninstall librabbitmq

此外,您可能需要在celery_result_backend中将result_backend变量更改为airflow.cfg。在最近版本的celery_[celery]节点中的变量已删除airflow.cfg前缀。

© www.soinside.com 2019 - 2024. All rights reserved.