我有一个在 Docker 容器中运行的 Python 应用程序。该组件负责一些长时间运行的任务,我想在进行新部署时正常关闭这些任务。所有子工作人员都在没有发出安全关闭信号的情况下被杀死。您知道如何解决这个问题吗?
这是代码:
import time
from celery import Celery
from celery.signals import worker_ready
import logging
log = logging.getLogger()
app = Celery(
"background_tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
@worker_ready.connect
def at_start(sender, **kwargs):
log.info("Worker ready")
with sender.app.connection() as connection:
sender.app.send_task(daily_orders_import.name, connection=connection)
@app.task(acks_late=True)
def daily_orders_import():
for u in range(1, 100):
import_orders_from_account.delay(u)
@app.task(bind=True, acks_late=True)
def import_orders_from_account(self, u):
while True:
log.info(self.request)
log.info(f"Importing orders for account {u}")
time.sleep(5)
有两种方法可以解决这个问题,你可以使用 celery 自己的信号导入,也可以使用系统本身。
要使用 celery 自己的信号系统,您可以使用以下代码:
from celery.signals import worker_shutdown
@worker_shutdown.connect
def on_worker_shutdown(**kwargs):
# Perform cleanup tasks here, such as stopping long-running tasks gracefully
pass
或者要使用系统的信号系统,您可以使用这个:
import signal
import sys
def sigterm_handler(signum, frame):
# Perform cleanup tasks here
sys.exit(0)
signal.signal(signal.SIGTERM, sigterm_handler)
希望这有帮助!