我的环境中有两个 celery 应用程序:
app1 = Celery('app1', broker=BROKER_URL1, backend=BROKER_URL1)
app2 = Celery('app2', broker=BROKER_URL2, backend=BROKER_URL2)
从
web
容器内的 Django 应用程序中,我需要调用一系列任务:
task1
由 app1
task2
由 app2
这两个应用程序具有不同的虚拟环境/Python版本,因此无法通过相同的
app
执行这两个任务。
@app1.task(bind=False)
def task1():
return {"key": "value"}
@app2.task(bind=False)
def task2(result):
return result["key"]
task_chain = (
app1.signature('app1.task1') |
app2.signature('app2.task2')
)
然后我尝试测试链是否正常工作 - 任务 1 似乎执行正确,但任务 2 出现错误:
@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_chain(self):
task_chain.apply_async().get() # Error: celery.exceptions.NotRegistered: 'app2.task2'
我如何测试这条链?
您需要有相同的结果后端才能首先跨多个应用程序运行链。即使使用相同的后端,这两个任务也将由与 app1 关联的工作线程运行。 Celery 链在运行时似乎不遵循与任务关联的应用程序。检查这个:https://github.com/celery/celery/issues/3350