我已经按照this教程尝试使用我自己的DAG在localhost上构建气流集群。当我在配置文件中设置airflow scheduler
后运行executor = CeleryExecutor
时,我收到了以下回溯:
Traceback(最近一次调用最后一次):
在args.func(args)中输入文件“/ home / yurii / Tools / anaconda3 / bin / airflow”,第28行
在调度程序job.run()中输入文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/cli.py”,第839行
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第200行,运行self._execute()
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第1309行,在_execute self._execute_helper(processor_manager)中
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/jobs.py”,第1441行,在_execute_helper中self.executor.heartbeat()
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/base_executor.py”,第124行,心跳中的self.execute_async(key,command = command,queue = queue)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/celery_executor.py”,第80行,在execute_async args = [command],queue = queue)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/task.py”,第573行,在apply_async ** dict(self._get_exec_options(),** options)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/base.py”,第354行,在send_task reply_to = reply_to或self.oid,** options
在pub_task ** kwargs中输入文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/amqp.py”,第310行
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第172行,发布routing_key,强制,立即,交换,声明)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/connection.py”,第449行,在_ensured return fun(* args,** kwargs)
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/messaging.py”,第188行,在_publish mandatory = mandatory,immediate = immediate,
文件“/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/init.py”,第122行,basic_publish必填或False,立即或错误,
TypeError:需要一个整数(获取类型NoneType)
一些其他信息:
我是Airflow / Celery / RabbitMQ / SQL的新手,所以任何帮助都将不胜感激!
您似乎使用librabbitmq作为amqp代理,而芹菜核心团队并不推荐这样做。使用py-amqp作为rabbitmq代理,你应该摆脱这个错误。
添加到上一个答案。使用py-amqp涉及从broker_url = amqp://XXXXX
更改为broker_url = pyamqp://XXXXX
或pip uninstall librabbitmq
。
此外,您可能需要在celery_result_backend
中将result_backend
变量更改为airflow.cfg
。在最近版本的celery_
中[celery]
节点中的变量已删除airflow.cfg
前缀。