我们在码头工人实例中使用芹菜工人。如果docker实例被杀死(可以更改docker并将其恢复),我们需要重试该任务。我的任务当前看起来像这样:
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build(self, config, import_data):
build_chain = chain(
build_dataset_docstore.s(config, import_data),
build_index.s(),
assemble_bundle.s()
).on_error(handle_chain_error.s())
return build_chain
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_dataset_docstore(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def build_index(self, config, import_data):
# do lots of stuff
@app.task(bind=True, max_retries=3, default_retry_delay=5, task_acks_late=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff
为了模仿正在重启的容器(工人被杀死),我正在运行以下脚本:
SLEEP_FOR=1
echo "-> Killing worker"
docker-compose-f docker/docker-compose-dev.yml kill worker
echo "-> Waiting $SLEEP_FOR seconds"
sleep $SLEEP_FOR
echo "-> Bringing worker back to life"
docker-compose-f docker/docker-compose-dev.yml start worker
看着花朵,我看到任务已经开始...很酷,但是...
编辑:来自文档:
如果工作人员在经过相当长的时间后不会因陷入无限循环或类似原因而关闭,则可以使用KILL信号强制终止工作人员:但是请注意,当前正在执行的任务将会丢失(即,除非任务中设置了acks_late选项)。
我正在使用Acks Late选项,为什么不重试?
这里的问题似乎是task_acks_late
(https://docs.celeryproject.org/en/latest/userguide/configuration.html#task-acks-late),我认为这是芹菜应用程序在任务上的参数。
我将task_acks_late
更新为acks_late
,并添加了reject_on_worker_lost
,此功能可以正常使用。
因此:
@app.task(bind=True, max_retries=3, default_retry_delay=5, acks_late=True, reject_on_worker_lost=True)
def assemble_bundle(self, config, import_data):
# do lots of stuff