Celery 限制每个用户运行的任务数量

问题描述 投票:0回答:2

我在 Celery 中有一个任务,如下所示:

@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
    # Long running task

该任务是在用户每次提交表单时

views
创建的,该任务需要大量资源,平均需要大约10分钟才能完成。

(views.py)
...
if request.method == 'POST':
    task_one.delay(user.id)
...

我想将每个用户创建的

task_one
任务数量限制为一个(活动或保留)

到目前为止,我正在做的是在创建任务之前检查是否有活动或为该用户保留的任务:

def user_created_task(active_tasks, reserved_tasks, user_id):
  for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
    if task['name'] == 'task_one' and task['args'][0] == user_id:
      # Check if there is a `task_one` task created for the user
      return True
  
  return False

def user_tasks_already_running_or_reserved(user_id):
  inspect = app.control.inspect()

  active_tasks = inspect.active()
  reserved_tasks = inspect.reserved()

  if active_tasks is None and reserved_tasks is None:
    # Celery workers are disconnected 
    return False

  return user_created_task(active_tasks, reserved_tasks, user_id)


(views.py)
...
if request.method == 'POST':
    if not user_tasks_already_running_or_reserved(user.id):
        task_one.delay(user.id)
...

我想知道是否有一种更有效的方法来做到这一点,而不是检查每个用户请求的所有工作人员,也许有一种方法可以在任务运行之前在 Celery 上添加此条件,到目前为止我还没有找到任何内容文档。

django celery django-celery celery-task
2个回答
4
投票

您所描述的情况需要使用分布式锁(因为 n = 1),但可以更一般地描述为分布式信号量。粗略地说,这些锁和机制不属于 celery 内置的内容。

正如评论者所提到的(帽子提示:@bernhard vallant),分布式锁的直接实现通常会利用数据库中的表或 redis rlock / redlocks

为了利用一个常见的实现,您可以执行以下操作:

from redlock import MultipleRedlockException, Redlock
from django.conf import settings

@app.task(name='task_one', autoretry_for=(MultipleRedlockException, ),  retry_kwargs={'max_retries': 5})
def task_one(user_id, *args, **kwargs):
    # assumes you are using redis for django cache with location
    # set to the redis url
    lock_manager = Redlock([ settings.CACHES['default']['LOCATION'] ])
    lock_name = f'task_one:{user_id}'
    # if the lock fails, we'll get the MultipleRedlockException and trigger
    # celery auto retry
    lock_manager.lock(lock_name, 60 * 60 * 2)  # lock for 2 hours
    try:
        # the main body of what you want to do goes here
        pass
    finally:
        lock_manager.unlock(lock_name)


0
投票

2ps的答案是正确的,但需要一些更正:

  1. 如果锁已经就位,
    .lock
    指令将返回
    False
    ,不会引发异常。
  2. 锁定超时以毫秒为单位,而不是秒
  3. .unlock
    指令需要接收锁对象,而不是锁名称或id

来源:https://github.com/SPSCommerce/redlock-py?tab=readme-ov-file

我正在使用 Celerydjango
这对我有用:

from django.conf import settings
from celery import shared_task
from redlock import Redlock

# Set bind=True to access the task from within
@shared_task(bind=True)
def sample_task(self, user_id):
    # 1. Create the log manager
    lock_manager = Redlock(
        [
            {
                "host": settings.REDIS_HOST,
                "port": settings.REDIS_PORT,
                "db": settings.REDIS_DB,
            },
        ]
    )

    # 2. Set the lock id
    lock_id = f"{user_id}"

    # 3. Try to lock the task with the user_id
    lock = lock_manager.lock(lock_id, 1000 * 60 * 10)  # Lock for 10 minutes

    # 4. Handle lock in use
    if not lock:
        # The lock is in use
        self.retry(countdown=30)  # Retry the task after 30 seconds
    
    try:
        # ... your long task ...
        pass
    finally:
        # Use finally to make sure the unlock happens even if
        # task fails.

        if lock:
            lock_manager.unlock(lock)
© www.soinside.com 2019 - 2024. All rights reserved.