Celery 优雅关闭

问题描述 投票:0回答:1

我有一个在 Docker 容器中运行的 Python 应用程序。该组件负责一些长时间运行的任务,我想在进行新部署时正常关闭这些任务。所有子工作人员都在没有发出安全关闭信号的情况下被杀死。您知道如何解决这个问题吗?

这是代码:

import time

from celery import Celery
from celery.signals import worker_ready
import logging

log = logging.getLogger()

app = Celery(
    "background_tasks",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)


@worker_ready.connect
def at_start(sender, **kwargs):
    log.info("Worker ready")
    with sender.app.connection() as connection:
        sender.app.send_task(daily_orders_import.name, connection=connection)


@app.task(acks_late=True)
def daily_orders_import():
    for u in range(1, 100):
        import_orders_from_account.delay(u)


@app.task(bind=True, acks_late=True)
def import_orders_from_account(self, u):
    while True:
        log.info(self.request)
        log.info(f"Importing orders for account {u}")
        time.sleep(5)
python celery
1个回答
0
投票

有两种方法可以解决这个问题,你可以使用 celery 自己的信号导入,也可以使用系统本身。

要使用 celery 自己的信号系统,您可以使用以下代码:

from celery.signals import worker_shutdown

@worker_shutdown.connect
def on_worker_shutdown(**kwargs):
    # Perform cleanup tasks here, such as stopping long-running tasks gracefully
    pass

或者要使用系统的信号系统,您可以使用这个:

import signal
import sys

def sigterm_handler(signum, frame):
    # Perform cleanup tasks here
    sys.exit(0)

signal.signal(signal.SIGTERM, sigterm_handler)

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.