在 celery Worker 上加载和卸载模型

问题描述 投票:0回答:1

我目前有一个系统,可以使用 Celery 在 Whisper AI 模型上启动任务。然而,现有的设置涉及在每个任务中加载模型,由于重复加载过程会消耗大量时间,因此这并不是最佳选择。不幸的是,我无法持续加载模型,因为其他系统需要访问 GPU VRAM。

为了解决这个问题,我正在考虑实现一个系统,该系统在收到任务时加载模型,并在没有剩余任务时卸载模型。这种方法旨在通过最大限度地减少模型在内存中加载的时间来优化资源利用率,从而确保 GPU 资源的有效利用。我相信这一调整将带来系统整体性能和响应能力的提高。

这就是想法(显然不起作用)

from celery import Celery, Task
from celery.signals import (
    task_received,
    celeryd_after_setup,
    task_success,
    task_failure,
)
import redis
import torch
import whisper
r = redis.Redis(host=hostname)

model = None

def checkActivesTasksWorker():
    global model
    if r.llen(activesTasksWorker) == 0:
        # Model is deleted when there is no task left on worker
        del model
        torch.cuda.empty_cache()

@celery.task
def myTask():
    launchAnalyticsWhisper(model)

@task_received.connect
def taskReceivedHandler(sender, request, **kwargs):
    if r.llen(activesTasksWorker) == 0:
        model = whisper.load_model("medium")
    r.lpush(activesTasksWorker, request.id)

@task_success.connect(sender=myTask)
def taskSuccessHandler(sender, result, **kwargs):
    r.lrem(activesTasksWorker, 1, result["taskId"])
    checkActivesTasksWorker()


@task_failure.connect(sender=myTask)
def taskFailureHandler(sender, task_id, exception, **kwargs):
    r.lrem(activesTasksWorker, 1, task_id)
    checkActivesTasksWorker()

@celeryd_after_setup.connect
def initList(sender, instance, **kwargs):
    # Clear the active tasks for the worker
    r.delete(activesTasksWorker)

model 到处都是 None,所以全局变量不起作用。 您有想法让这项工作成功吗?

python redis pytorch celery openai-whisper
1个回答
0
投票

Celery 为每个任务生成新进程,这就是为什么它们无法访问全局变量。

做到这一点的最佳方法是为相关模型配备专用的推理服务器。推理服务器在启动时加载模型。 celery 任务向推理服务器发出请求。

如果您必须在同一 GPU 上处理多个模型,您可以向推理服务器添加端点,以在 CPU 内存和 GPU 内存之间移动模型。这样您就可以保持 GPU 内存空闲,同时仍然只从磁盘加载模型一次。

© www.soinside.com 2019 - 2024. All rights reserved.