如何在Django中使用Celery根据模型字段来调度任务并动态更新?

问题描述 投票:0回答:1

我在使用 Django 开发应用程序并使用 Celery 基于模型字段安排任务时遇到了挑战。我想分享我的解决方案并寻求有关潜在改进的建议。作为一个相对缺乏经验的开发人员,我欢迎任何指导。

我正在开发一个用于管理培训课程的 Django 应用程序,我需要在注册截止日期到来时安排任务。我已经实现了 Celery 来进行任务调度,并将任务 ID 存储在模型字段中,以便在必要时进行撤销。我的挑战是确保当注册截止日期更改时,如果截止日期已过,计划的任务也会更新或执行。

from datetime import datetime
from celery.result import AsyncResult
from django.db import transaction
from django.db.models.signals import post_save, pre_delete, pre_save
from django.dispatch import receiver
from django.shortcuts import get_object_or_404
from django.utils import timezone
from .models import Training
from .tasks import post_registration_deadline


def delay_and_assign_registration_deadline_task(training_id):
    """
    The post_registration_deadline notifies the members of the training manager group if the minimum number of
    candidates is not reached. If there is still some registrations to review, they are canceled with the reason
    "REGISTRATION_DEADLINE_PASSED".
    """
    training = get_object_or_404(Training, id=training_id)
    task_id = post_registration_deadline.delay(training.id).task_id
    training.registration_deadline_task = task_id
    training.save()

@receiver(pre_save, sender=Training)
def pre_save_training(sender, instance, **kwargs):
    if instance.type.registration_enabled:

        """
        if the training is already created, we check if the registration deadline has changed
        if it has, we revoke the previous task. If the new deadline is now or before, we execute the task.
        If not, we schedule it for the new deadline date.
        """

        if instance.pk:
            previous = get_object_or_404(Training, id=instance.id)

            if previous.registration_deadline != instance.registration_deadline:
                task = AsyncResult(str(getattr(instance, 'registration_deadline_task')))
                task.revoke()

                if instance.registration_deadline <= timezone.now().date():
                    transaction.on_commit(lambda: delay_and_assign_registration_deadline_task(instance.id))
                else:
                    eta_time = datetime.combine(
                        instance.registration_deadline, 
                        datetime.max.time(), 
                        tzinfo=timezone.utc
                    )
                    instance.registration_deadline_task = post_registration_deadline.apply_async(
                        eta=eta_time, args=[instance.id]
                    ).task_id


@receiver(post_save, sender=Training)
def post_save_training(sender, instance, created, **kwargs):
    """
    if, at training creation, the registration deadline is today or before, we start the
    post_registration_deadline_task if not, we schedule it for the registration deadline date.
    """

    if created:
        if instance.type.registration_enabled:

            if instance.registration_deadline <= timezone.now().date():
                transaction.on_commit(lambda: delay_and_assign_registration_deadline_task(instance.id))
            else:
                eta_time = datetime.combine(
                     instance.registration_deadline, 
                     datetime.max.time(), 
                     tzinfo=timezone.utc
                )
                instance.registration_deadline_task = post_registration_deadline.apply_async(
                    eta=eta_time, args=[instance.id]
                ).task_id


@receiver(pre_delete, sender=Training)
def pre_delete_training(sender, instance, **kwargs):
    registration_deadline_task = AsyncResult(str(getattr(instance, 'registration_deadline_task')))
    registration_deadline_task.revoke()
django celery task scheduling
1个回答
0
投票

考虑到任务日期将来可能会发生变化,使用周期性任务通常更灵活且更易于管理。原因如下:

定期任务:

优点:

  1. 灵活性:您可以轻松更新定期任务的时间表,而无需取消和重新安排它。
  2. 管理:如果你使用的是Django-Celery-Beat这样的工具,你可以通过Django管理界面来管理任务计划,这样更新起来很方便。
  3. 重复性:定期任务是为重复性任务而设计的,如果您需要该功能。

缺点:

  1. 复杂性:设置周期性任务最初可能会稍微复杂一些。

使用
delay

优点:

  1. 简单:初始设置更容易。

缺点:

  1. 不灵活:如果日期发生变化,您必须手动取消任务并重新安排它,这可能很麻烦且容易出错。

建议:

如果使用
delay

您需要跟踪

apply_async()
返回的任务 ID,并在日期发生变化时使用它来撤销任务。然后,您可以使用新日期重新安排时间。

from celery.task.control import revoke
# To cancel a task
revoke(task_id, terminate=True)

如果使用定期任务:

您只需更新时间表即可。如果您使用 Django-Celery-Beat,这可以通过 Django 管理界面或以编程方式完成。

数据库存储:

将任务详细信息(包括其预期执行日期)存储在数据库中。 添加一个字段来指示任务是否已执行。

定期任务:

创建定期运行的任务(例如,每分钟或每小时)。 在此任务中,根据预期执行日期以及是否已执行,在数据库中查询应执行的任何任务。 执行符合条件的任务并在数据库中更新其状态。

示例:您可以参考示例这里

这种方法使您可以通过简单地更新数据库中的execute_at字段来灵活地更改执行日期。定期任务将在下次运行时获取此更改。

如果上述方法有问题或有任何疑问,请告诉我

© www.soinside.com 2019 - 2024. All rights reserved.