我正在尝试为监听 rabbitMQ 实例并根据该消息值执行 celery 作业的消息传递消费者运行脚本。我收到了循环依赖的问题,无法弄清楚到底是什么问题以及如何解决它。以下是我的文件示例:
芹菜.py 导入操作系统 从芹菜进口芹菜
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "projectTest.settings")
app = Celery(
"appTest",
broker=os.environ.get("RABBITMQ_CONN_STRING"),
backend="rpc://",
)
# Optional configuration, see the application user guide.
app.conf.update(
result_expires=3600,
)
app.config_from_object("appTest.celeryConfig", namespace="CELERY")
app.autodiscover_tasks()
if __name__ == "__main__":
app.start()
类任务.py 从芹菜进口应用
logger = get_task_logger("emailSvcCelery")
class ClassTask(app.Task):
name = "class-task"
def run(self):
print('Task Ran!')
app.register_task(ClassTask)
message_consumer.py
import os
import json
import pika
from tasks.class_task import ClassTask
connection_parameters = pika.ConnectionParameters(os.environ.get("RABBITMQ_CONN_STRING"))
connection = pika.BlockingConnection(connection_parameters)
channel = connection.channel()
queues = (
'queue-test'
)
for q in queues:
channel.queue_declare(queue=q, durable=True)
def callback(channel, method, properties, body):
ClassTask().delay()
channel.basic_consume(queue='queue-test', on_message_callback=callback, auto_ack=True)
print("Started Consuming...")
channel.start_consuming()
运行后我收到的错误如下
python3 message_consumer.py
:
Traceback (most recent call last):
File "/message_consumer.py", line 4, in <module>
from tasks.class_task import ClassTask
File "/class_task.py", line 1, in <module>
from celery import app
File "/celery.py", line 2, in <module>
from celery import Celery
ImportError: cannot import name 'Celery' from partially initialized module 'celery' (most likely due to a circular import) (/celery.py)
我对 python 有点陌生,需要一些帮助,谢谢!
不幸的是,这是一个包依赖错误。 Celery 无法初始化,因为它使用了已贬值的包中的方法。 Celery 使用
importlib-metadata
中的接口,该接口在版本 5 中已弃用。使用此命令回滚版本:
pip install importlib-metadata==4.8.3