我的任务的简化版本:
for _ in range(100000):
# Check if worker is not shutting downed
# if yes, exit gracefully
# else, Do something.
我尝试用它来处理worker上的SIGTERM
@worker_shutting_down.connect
def worker_shutting_down_handler(sig, how, exitcode, ** kwargs):
# Set worker_is_shutting_down
问题在于
worker_shutting_down.connect
仅由celery主进程处理。 ForkPoolWorkers 对此一无所知。
如何将此消息从主进程传播到所有 ForkPoolworkers?
一些注意事项:
worker_process_shutdown.connect
,但仅在 ForkPoolProcess 中成功关闭后才会调用。也许不是最好的解决方案,但它相当有效。
由于所有 ForkPoolWorkers 的文件系统都是相同的,因此我使用文件作为关闭信号。
from pathlib import Path
from celery.signals import (
worker_shutting_down,
)
@worker_shutting_down.connect
def worker_shutting_down_handler(*_args, **_kwargs):
"""
Worker is shutting down
"""
Path('/tmp/worker_shutting_down').touch()
在任务中
def should_gracefully_exit():
"""
Check if the tasks should gracefully exit
"""
return os.path.isfile("/tmp/worker_shutting_down")
def task():
for _ in range(100000):
if should_gracefully_exit()
#teardown
# else, Do something.