Celery 中 app.tasks 如何不同时启动?

问题描述 投票:0回答:2

经过多次测试和搜索,我没有得到结果,希望你能指导我。
我的代码可在此 GitHub 地址获取。
由于主要代码比较复杂,我写了一个简单的代码示例,有同样的问题,并链接到上面的地址。
我有一个包含四个 app.tasks 的工作进程,其名称如下:

  • app_1000
  • app_1002
  • app_1004
  • app_1006

并且每个app.tasks应该只同时执行一次,也就是说,例如

app_1000
不应该同时执行两三次,而应该一次只执行一次,并且如果当前
app_1000
的任务完成,可以转到下一个作业。
芹菜配置:

broker_url='amqp://guest@localhost//'
result_backend='rpc://'
include=['celery_app.tasks']
worker_prefetch_multiplier = 1

task_routes={
    'celery_app.tasks.app_1000':{'queue':'q_app'},
    'celery_app.tasks.app_1002':{'queue':'q_app'},
    'celery_app.tasks.app_1004':{'queue':'q_app'},
    'celery_app.tasks.app_1006':{'queue':'q_app'},
    'celery_app.tasks.app_timeout':{'queue':'q_timeout'},
}

如您所见,

worker_prefetch_multiplier = 1
在上面的配置中。


我使用fastapi发送请求,示例请求如下(为了简化问题,我只通过fastapi发送该worker必须执行的任务数量)




我还使用花脚本来检查任务。
在 Postman 中按下“发送”按钮后,所有这 20 个假设的任务都被发送到 Worker,一开始一切都很好,因为每个 app.tasks 都启动了一个任务。



但几分钟后,当事情继续进行时,app.tasks同时执行,也就是说,例如根据照片,

app_1000
已经启动了两次,或者在下一张照片中,
app_1006
已经启动了两次并且它们同时运行,我不打算这样做。情况发生。

几分钟后:


我希望app_1000或app_1006一次只做一件事,但我不知道该怎么做。

重要提示:请不要建议为4个app.tasks创建4个队列,因为在我的实际项目中,我有超过100个app.tasks,并且管理所有这些队列非常困难。
可能会出现一个问题,例如为什么app_1000不应该同时执行?这个问题的答案很复杂,要解释的主要代码太多,所以请跳过这个问题。
代码在GitHub(代码量不大,不会占用你太多时间) 如果你想运行它,你可以输入以下命令:

celery -A celery_app  worker -Q q_app --loglevel=INFO --concurrency=4 -n  worker@%h 
celery flower --port=5566
uvivorn api:app --reload

谢谢
python rabbitmq celery
2个回答
0
投票

不幸的是,celery 没有为此提供任何开箱即用的解决方案。您必须实现分布式缓存锁定机制并在执行任务之前进行检查。类似的问题和相关答案存在here


0
投票

您可以尝试在任务中使用锁,例如:

@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def app_1000(self, arg):
    # Using lock to ensure there's no other task (you can include args on lock_id)
    lock_id = 'lock-1000'
    with cache_lock(lock_id, self.app.oid) as acquired:

        # Skip if there's another task
        if not acquired:
            logger.debug("Skip task: There's another task doing the same.")
            return False

        # your task here ...

其中

cache_lock
是:

from django.core.cache import cache


@contextmanager
def cache_lock(lock_id, oid):
    timeout_at = monotonic() + LOCK_EXPIRE - 3
    # cache.add fails if the key already exists
    status = cache.add(lock_id, oid, LOCK_EXPIRE)
    try:
        yield status
    finally:
        # memcache delete is very slow, but we have to use it to take
        # advantage of using add() for atomic locking
        if monotonic() < timeout_at and status:
            # don't release the lock if we exceeded the timeout
            # to lessen the chance of releasing an expired lock
            # owned by someone else
            # also don't release the lock if we didn't acquire it
            cache.delete(lock_id)

© www.soinside.com 2019 - 2024. All rights reserved.