经过多次测试和搜索,我没有得到结果,希望你能指导我。
我的代码可在此 GitHub 地址获取。
由于主要代码比较复杂,我写了一个简单的代码示例,有同样的问题,并链接到上面的地址。
我有一个包含四个 app.tasks 的工作进程,其名称如下:
并且每个app.tasks应该只同时执行一次,也就是说,例如
app_1000
不应该同时执行两三次,而应该一次只执行一次,并且如果当前app_1000
的任务完成,可以转到下一个作业。
broker_url='amqp://guest@localhost//'
result_backend='rpc://'
include=['celery_app.tasks']
worker_prefetch_multiplier = 1
task_routes={
'celery_app.tasks.app_1000':{'queue':'q_app'},
'celery_app.tasks.app_1002':{'queue':'q_app'},
'celery_app.tasks.app_1004':{'queue':'q_app'},
'celery_app.tasks.app_1006':{'queue':'q_app'},
'celery_app.tasks.app_timeout':{'queue':'q_timeout'},
}
如您所见,
worker_prefetch_multiplier = 1
在上面的配置中。
我还使用花脚本来检查任务。
在 Postman 中按下“发送”按钮后,所有这 20 个假设的任务都被发送到 Worker,一开始一切都很好,因为每个 app.tasks 都启动了一个任务。
但几分钟后,当事情继续进行时,app.tasks同时执行,也就是说,例如根据照片,
app_1000
已经启动了两次,或者在下一张照片中,app_1006
已经启动了两次并且它们同时运行,我不打算这样做。情况发生。
我希望app_1000或app_1006一次只做一件事,但我不知道该怎么做。
重要提示:请不要建议为4个app.tasks创建4个队列,因为在我的实际项目中,我有超过100个app.tasks,并且管理所有这些队列非常困难。
可能会出现一个问题,例如为什么app_1000不应该同时执行?这个问题的答案很复杂,要解释的主要代码太多,所以请跳过这个问题。
代码在GitHub(代码量不大,不会占用你太多时间)
如果你想运行它,你可以输入以下命令:
celery -A celery_app worker -Q q_app --loglevel=INFO --concurrency=4 -n worker@%h
celery flower --port=5566
uvivorn api:app --reload
不幸的是,celery 没有为此提供任何开箱即用的解决方案。您必须实现分布式缓存锁定机制并在执行任务之前进行检查。类似的问题和相关答案存在here。
您可以尝试在任务中使用锁,例如:
@app.task(bind=True, autoretry_for=(Exception,), retry_backoff=True)
def app_1000(self, arg):
# Using lock to ensure there's no other task (you can include args on lock_id)
lock_id = 'lock-1000'
with cache_lock(lock_id, self.app.oid) as acquired:
# Skip if there's another task
if not acquired:
logger.debug("Skip task: There's another task doing the same.")
return False
# your task here ...
其中
cache_lock
是:
from django.core.cache import cache
@contextmanager
def cache_lock(lock_id, oid):
timeout_at = monotonic() + LOCK_EXPIRE - 3
# cache.add fails if the key already exists
status = cache.add(lock_id, oid, LOCK_EXPIRE)
try:
yield status
finally:
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
if monotonic() < timeout_at and status:
# don't release the lock if we exceeded the timeout
# to lessen the chance of releasing an expired lock
# owned by someone else
# also don't release the lock if we didn't acquire it
cache.delete(lock_id)