优雅退出ForkPoolWorker中的celery任务

问题描述 投票:0回答:1

我的任务的简化版本:

for _ in range(100000):
    # Check if worker is not shutting downed
    # if yes, exit gracefully

    # else, Do something.

我尝试用它来处理worker上的SIGTERM

@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, ** kwargs):
    # Set worker_is_shutting_down

问题在于

worker_shutting_down.connect
仅由celery主进程处理。 ForkPoolWorkers 对此一无所知。

如何将此消息从主进程传播到所有 ForkPoolworkers?

一些注意事项:

  1. 我无法在任务上设置 time_limit,因为任务需要长时间运行。就是这么设计的。
  2. 我再次打电话给他们没有问题,但重要的是他们优雅地退出进行一些拆解工作。
  3. 已经尝试过
    worker_process_shutdown.connect
    ,但仅在 ForkPoolProcess 中成功关闭后才会调用。
celery
1个回答
0
投票

也许不是最好的解决方案,但它相当有效。

由于所有 ForkPoolWorkers 的文件系统都是相同的,因此我使用文件作为关闭信号。

from pathlib import Path
from celery.signals import (
    worker_shutting_down,
)

@worker_shutting_down.connect
def worker_shutting_down_handler(*_args, **_kwargs):
    """
    Worker is shutting down
    """
    Path('/tmp/worker_shutting_down').touch()

在任务中

def should_gracefully_exit():
    """
    Check if the tasks should gracefully exit
    """
    return os.path.isfile("/tmp/worker_shutting_down")

def task():
    for _ in range(100000):
        if should_gracefully_exit()
            #teardown

        # else, Do something.

© www.soinside.com 2019 - 2024. All rights reserved.