Django芹菜注册定期任务

问题描述 投票:1回答:1

我在Django 2.2中使用Celery 4.4

我必须创建一个PeriodicTask

,我正在扩展PeriodicTask询问为

from celery.schedules import crontab
from celery.task import PeriodicTask

class IncompleteOrderHandler(PeriodicTask):
    run_every = crontab(
        minute='*/{}'.format(getattr(settings, 'INCOMPLETE_ORDER_HANDLER_PULSE', 5))
    )

    def run(self, *args, **kwargs):
        # Task definition
        eligible_users, slot_begin, slot_end = self.get_users_in_last_slot()
        map(lambda user: self.process_user(user, slot_begin, slot_end), eligible_users)

较早注册上述任务,我曾经打电话给

from celery.registry import tasks

tasks.register(IncompleteOrderHandler)

但是现在registry中没有celery模块。如何注册上述定期任务?

[我在Django 2.2上使用Celery 4.4,我必须创建一个Periodic Task,我要从celery.schedules从celery.task import PeriodicTask类中导入crontab来扩展PeriodicTask的要求,...]]

django celery django-celery django-celery-beat
1个回答
0
投票

基于类的芹菜任务,我遇到了同样的问题。这必须有效,但事实并非如此!偶然地,我的问题在这两个更改上被每个人解决了:

    <<
  1. 这是我的基本芹菜设置文件:

© www.soinside.com 2019 - 2024. All rights reserved.