考虑以下tasks.py
模块(改编自http://docs.celeryproject.org/en/latest/getting-started/first-steps-with-celery.html#first-steps):
import logging
import sys
from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
logging.info(f"Adding {x} and {y}...")
return x + y
def call_add(x, y):
add.delay(x, y)
在同一目录中,我有一个test_tasks.py
测试模块,可以读取
from unittest.mock import patch
import tasks
@patch('logging.info')
def test_adder(info_mock):
tasks.call_add(1, 2)
info_mock.assert_not_called()
这个测试通过(如果我用pytest test_tasks.py
运行它),但我不确定为什么没有调用info_mock
?我希望以下断言能够通过
info_mock.assert_called_with("Adding 1 and 2...")
为什么在这个例子中没有通过logging.info
调用tasks.call_add()
?在我看来,它等同于http://docs.celeryproject.org/en/latest/userguide/testing.html中给出的例子。
确保在运行单元测试时直接在同一进程中运行测试。
Celery使得在运行任务“同步”时保持相同的API非常简单,并跳过损坏的/工作者部分。
app = Celery('tasks', broker='pyamqp://guest@localhost//', task_always_eager=True)
http://docs.celeryproject.org/en/latest/userguide/configuration.html#task-always-eager