Celery任务已永久启动(未重试)

问题描述 投票:0回答:1

我们在码头工人实例中使用芹菜工人。如果docker实例被杀死(可以更改docker并将其恢复),我们需要重试该任务。我的任务当前看起来像这样:

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
    build_chain = chain(
        build_dataset_docstore.s(config, import_data),
        build_index.s(),
        assemble_bundle.s()
    ).on_error(handle_chain_error.s())

    return build_chain

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
    # do lots of stuff

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
    # do lots of stuff

@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
    # do lots of stuff

为了模仿正在重启的容器(工人被杀死),我正在运行以下脚本:

SLEEP_FOR=1

echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker

echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR

echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker

看着花朵,我看到任务已经开始...很酷,但是...

  • 为什么不重试?
  • 我需要手动处理这种情况吗?
  • 如果是,执行此操作的正确方法是什么?

编辑:来自文档:

如果工作人员在经过相当长的时间后不会因陷入无限循环或类似原因而关闭,则可以使用KILL信号强制终止工作人员:但是请注意,当前正在执行的任务将会丢失(即,除非任务中设置了acks_late选项)。

我正在使用Acks Late选项,为什么不重试?

celery django-celery
1个回答
0
投票

这里的问题似乎是task_acks_latehttps://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late),我认为这是芹菜应用程序在任务上的参数。

我将task_acks_late更新为acks_late,并添加了reject_on_worker_lost,此功能可以正常使用。

因此:

@app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
def assemble_bundle(self, config, import_data):
    # do lots of stuff
© www.soinside.com 2019 - 2024. All rights reserved.