使 Django 测试用例数据库对 Celery 可见

问题描述 投票:0回答:4

当 Django 测试用例运行时,它会创建一个隔离的测试数据库,以便在每个测试完成时回滚数据库写入。我正在尝试使用 Celery 创建集成测试,但我不知道如何将 Celery 连接到这个临时测试数据库。在简单的设置中,保存在 Django 中的对象对于 Celery 是不可见的,并且保存在 Celery 中的对象会无限期地保留。

这是一个示例测试用例:

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

我有两个问题:

  1. 如何让 Celery 可以看到 Django 测试用例的对象?

  2. 如何确保Celery保存的所有对象在测试完成后自动回滚?

如果无法自动执行此操作,我愿意手动清理对象,但是即使在

tearDown
中删除
APISimpleTestCase
中的对象似乎也会回滚。

django testing celery
4个回答
34
投票

这可以通过在 Django 测试用例中启动 Celery 工作线程来实现。

背景

Django的内存数据库是sqlite3。正如 Sqlite 内存数据库的描述页面 中所述,“共享内存数据库的所有数据库连接都需要位于同一进程中。”这意味着,只要 Django 使用内存测试数据库,并且 Celery 在单独的进程中启动,那么 Celery 和 Django 根本不可能共享测试数据库。

但是,使用

celery.contrib.testing.worker.start_worker
,可以在同一进程内的单独线程中启动 Celery 工作线程。该工作人员可以访问内存数据库。

这假设 Celery 已经以 通常的方式 与 Django 项目进行了设置。

解决方案

由于 Django-Celery 涉及一些跨线程通信,因此只有不在隔离事务中运行的测试用例才能工作。测试用例必须直接从

SimpleTestCase
或其 Rest 等效项
APISimpleTestCase
继承,并将
databases
设置为
'__all__'
或仅设置与测试交互的数据库。

关键是在

setUpClass
TestCase
方法中启动 Celery Worker,并在
tearDownClass
方法中关闭它。关键函数是
celery.contrib.testing.worker.start_worker
,它需要当前 Celery 应用程序的实例,大概是从
mysite.celery.app
获取的,并返回一个 Python
ContextManager
,它有
__enter__
__exit__
方法,必须在
setUpClass 中调用分别是 
tearDownClass
。可能有一种方法可以避免使用装饰器或其他东西手动输入和存在
ContextManager
,但我无法弄清楚。这是一个示例
tests.py
文件:

from celery.contrib.testing.worker import start_worker
from django.test import SimpleTestCase

from mysite.celery import app

class BatchSimulationTestCase(SimpleTestCase):
    databases = '__all__'

    @classmethod
    def setUpClass(cls):
        super().setUpClass()

        # Start up celery worker
        cls.celery_worker = start_worker(app, perform_ping_check=False)
        cls.celery_worker.__enter__()

    @classmethod
    def tearDownClass(cls):
        super().tearDownClass()

        # Close worker
        cls.celery_worker.__exit__(None, None, None)

    def test_my_function(self):
        # my_task.delay() or something

无论出于何种原因,测试工作人员都会尝试使用名为

'celery.ping'
的任务,可能是为了在工作人员失败的情况下提供更好的错误消息。它正在寻找的任务是
celery.contrib.testing.tasks.ping
,该任务在测试时不可用。将
perform_ping_check
start_worker
参数设置为
False
会跳过对此的检查并避免相关的错误。

现在,运行测试时,无需启动单独的 Celery 进程。 Celery 工作线程将在 Django 测试进程中作为单独的线程启动。该工作人员可以查看任何内存数据库,包括默认的内存测试数据库。要控制工作人员的数量,

start_worker
中有可用的选项,但默认值似乎是单个工作人员。


3
投票

对于您的单元测试,我建议跳过 celery 依赖项,以下两个链接将为您提供启动单元测试所需的信息:

如果你真的想测试包括队列在内的 celery 函数调用,我可能会设置一个包含服务器、工作线程、队列组合的 dockercompose,并从 django-celery 文档扩展自定义 CeleryTestRunner。但我看不到它有什么好处,因为测试系统可能距离生产太远,无法具有代表性。


0
投票

我根据@drhagen 的解决方案找到了另一种解决方法:

致电

celery.contrib.testing.app.TestApp()
 之前先致电 
start_worker(app)

from celery.contrib.testing.worker import start_worker
from celery.contrib.testing.app import TestApp

from myapp.tasks import app, my_task


class TestTasks:
    def setup(self):
        TestApp()
        self.celery_worker = start_worker(app)
        self.celery_worker.__enter__()

    def teardown(self):
        self.celery_worker.__exit__(None, None, None)

0
投票

我找到了一种无需写入主数据库即可测试 celery 任务的有用方法。

可以使用unittest.mock.patch来替换celery功能

import json
from rest_framework.test import APITestCase
from myapp.models import MyModel
from myapp.util import get_result_from_response
from unittest.mock import patch

class MyTestCase(APITestCase):
    @classmethod
    def setUpTestData(cls):
        # This object is not visible to Celery
        MyModel(id='test_object').save()

    @patch('appName.views.yourView.yourCeleryTask.delay', new=yourCeleryTask)
    def test_celery_integration(self):
        # This view spawns a Celery task
        # Task should see MyModel.objects.get(id='test_object'), but can't
        http_response = self.client.post('/', 'test_data', format='json')

        result = get_result_from_response(http_response)
        result.get()  # Wait for task to finish before ending test case
        # Objects saved by Celery task should be deleted, but persist

希望这有帮助,视图本身将在测试时执行任务,而不是调用 celery Worker 来执行任务

© www.soinside.com 2019 - 2024. All rights reserved.