Django模型上的芹菜任务

问题描述 投票:3回答:1

我正在尝试学习如何使用芹菜来检查我的一个模特每天的日期。我的一个模型包含一个到期日期和一个布尔字段,表明他们的保险是否已过期。

该模型非常大,所以我将发布一个精简版本。我想我有两个选择。在模型方法上运行芹菜任务或在tasks.py中重写函数。然后我需要使用芹菜节拍来运行日程安排每天检查。

我有功能,但我直接传递模型对象,我认为这是错误的。

我也在如何在我的celery.py中使用celery beat scheduler中的args时遇到麻烦。

我真的很接近这个工作,但我想我会以错误的方式执行任务。我认为在模型方法上执行任务可能是最干净的,我只是不确定如何实现。

models.朋友

class CarrierCompany(models.Model):
    name = models.CharField(max_length=255, unique=True)
    insurance_expiration = models.DateTimeField(null=True)
    insurance_active = models.BooleanField()

    def insurance_expiration_check(self):
        if self.insurance_expiration > datetime.today().date():
            self.insurance_active = True
            self.save()
            print("Insurance Active")
        else:
            self.insurance_active = False
            self.save()
            print("Insurance Inactive")

tasks.朋友

from __future__ import absolute_import, unicode_literals
from celery.decorators import task
from datetime import datetime, date
from django.utils import timezone
from .models import CarrierCompany



@task(name="insurance_expired")
def insurance_date():
    carriers = CarrierCompany.objects.all()
    for carrier in carriers:
        date = datetime.now(timezone.utc)
        if carrier.insurance_expiration > date:
            carrier.insurance_active = True
            carrier.save()
            print("Insurance Active")
        else:
            carrier.insurance_active = False
            carrier.save()
            print("Insurance Inactive")

celery.朋友

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.local')

app = Celery('POTRTMS')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks()


@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))


app.conf.beat_schedule = {
    'check-insurance-daily': {
        'task': 'insurance_expired',
        'schedule': crontab(hour='8')
    },
}

***更新节拍时间表以反映我何时想要运行它。

django celery celery-task celerybeat
1个回答
2
投票

我如何做到这一点的一个例子如下。此外,如果您在Django App中包含时区,而不是使用传统的日期时间,您可能希望使用此处找到的时区库。

models.朋友

class CarrierCompany(models.Model):
    ...

    @property
    def is_insurance_expired(self):
        from django.utils import timezone
        if self.insurance_expiration > timezone.datetime.today():
            print("Insurance Active")
            return True
        else:
            print("Insurance Active")
            return False

tasks.朋友

def insurance_date():
    carriers = CarrierCompany.objects.all()
    for carrier in carriers:
        if carrier.is_insurance_expired:
            carrier.insurance_active = True
            carrier.save()
        else:
            carrier.insurance_active = False
            carrier.save()

您还可以执行其他操作,例如,如果False为False则不更新,默认为False,反之亦然。您也可以在模型函数中完成所有这些操作,但我个人希望保持逻辑分离(我只是如何组织我的东西)。希望这可以帮助你度过难关的地方。

© www.soinside.com 2019 - 2024. All rights reserved.