当任务由不同的 celery 应用执行时,无法测试 celery 链

问题描述 投票:0回答:1

我的环境中有两个 celery 应用程序:

app1 = Celery('app1', broker=BROKER_URL1, backend=BROKER_URL1)
app2 = Celery('app2', broker=BROKER_URL2, backend=BROKER_URL2)

web
容器内的 Django 应用程序中,我需要调用一系列任务:

  • task1
    app1
  • 执行
  • task2
    app2
  • 执行

这两个应用程序具有不同的虚拟环境/Python版本,因此无法通过相同的

app
执行这两个任务。

@app1.task(bind=False)
def task1():
    return {"key": "value"}

@app2.task(bind=False)
def task2(result):
    return result["key"]

task_chain = (
    app1.signature('app1.task1') |
    app2.signature('app2.task2')
)

然后我尝试测试链是否正常工作 - 任务 1 似乎执行正确,但任务 2 出现错误:

@override_settings(CELERY_TASK_ALWAYS_EAGER=True)
def test_chain(self):
    task_chain.apply_async().get() # Error: celery.exceptions.NotRegistered: 'app2.task2'

我如何测试这条链?

python django testing celery django-celery
1个回答
0
投票

您需要有相同的结果后端才能首先跨多个应用程序运行链。即使使用相同的后端,这两个任务也将由与 app1 关联的工作线程运行。 Celery 链在运行时似乎不遵循与任务关联的应用程序。检查这个:https://github.com/celery/celery/issues/3350

© www.soinside.com 2019 - 2024. All rights reserved.