我在Django 2.2中使用Celery 4.4
我必须创建一个PeriodicTask
,我正在扩展PeriodicTask询问为from celery.schedules import crontab from celery.task import PeriodicTask class IncompleteOrderHandler(PeriodicTask): run_every = crontab( minute='*/{}'.format(getattr(settings, 'INCOMPLETE_ORDER_HANDLER_PULSE', 5)) ) def run(self, *args, **kwargs): # Task definition eligible_users, slot_begin, slot_end = self.get_users_in_last_slot() map(lambda user: self.process_user(user, slot_begin, slot_end), eligible_users)
较早注册上述任务,我曾经打电话给
from celery.registry import tasks tasks.register(IncompleteOrderHandler)
但是现在
registry
中没有celery
模块。如何注册上述定期任务?
[我在Django 2.2上使用Celery 4.4,我必须创建一个Periodic Task,我要从celery.schedules从celery.task import PeriodicTask类中导入crontab来扩展PeriodicTask的要求,...]]
基于类的芹菜任务,我遇到了同样的问题。这必须有效,但事实并非如此!偶然地,我的问题在这两个更改上被每个人解决了:
这是我的基本芹菜设置文件: