使用Python、Flask、Celery和Redis,包含多个组的任务无法调用其上有链的子任务

问题描述 投票:0回答:1
@celery.task(bind=True, ignore_result=False)
def process_ml_model_dataset(self, dataset_serialized, media_serialized, ml_model_serialized, collection_name, n_factor):

    if n_factor == 1:
    result_group = group(process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, n_factor))
else:
    # tasks = [process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, i) for i in range(1, n_factor+1)]
    result_group = (group(process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, i) for i in range(1, n_factor+1)))
    # tasks = []
    # for i in range(1, n_factor+1):
    #     tasks.append(process_ml_model_dataset_factor_x.s(dataset_serialized, media_serialized, ml_model_serialized, collection_name, i))
    # result_group = group(*tasks)

result = result_group()

while not result.ready():
    time.sleep(60)

@celery.task(bind=True, ignore_result=False)
def process_ml_model_dataset_factor_x(self, dataset_serialized, media_serialized, ml_model_serialized, collection_name, factor_x):
result_chain = chain(
    process_ml_model_chunk.s(None, 1, dataset_serialized, ml_model_serialized, first_chunk_pickle, collection_name, factor_x))

reader_second_chunk = FILE_HELPER.csv_reader_chunked(
    MEDIAS_UPLOADS_DEFAULT_DEST + "/" + str(media_serialized["id"]) + "/" + media_serialized["file_name"],
    columns_header, DATASET_CHUNK_SIZE, DATASET_CHUNK_SIZE)
for i, chunk in enumerate(reader_second_chunk):
    result_chain |= process_ml_model_chunk.s(i + 2, dataset_serialized, ml_model_serialized, pickle.dumps(chunk),
                                             collection_name, factor_x)

在下面的简短代码示例中,我的代码/环境无法执行此任务。在此示例中,我请求一组包含任务链的任务执行某些工作,我在工作人员上得到以下输出:

worker-1  | [2024-04-03 09:42:23,143: INFO/MainProcess] Task src.tasks.dataset_full_ml_process.process_ml_model_chunk[d3c55d4c-2c1e-48e5-927f-50dce7ac6738] received
worker-1  | [2024-04-03 09:42:23,210: INFO/MainProcess] Task src.tasks.dataset_full_ml_process.process_ml_model_chunk[26af7b45-d1fd-41f6-88c3-04b2f3dd9f6a] received
worker-1  | [2024-04-03 09:42:23,273: INFO/MainProcess] Task src.tasks.dataset_full_ml_process.process_ml_model_chunk[0314e273-a66b-493a-88e6-6ae81f30cb5e] received

但任务从未执行。使用芹菜接缝很容易适用于任何情况,我已经阅读了文档中的解决方案,搜索了大量的 stackoverflow 线程,但到目前为止还没有机会执行此类任务。

我提供的代码只是一个示例,但我的项目基于这种方法:docker-flask-example by Nickjj

到目前为止我已经尝试过 celery 版本 5.3.6 和 5.4.0rc2。

我的芹菜设置非常困难:

REDIS_URL = os.getenv("REDIS_URL", "redis://redis:6379/0")

# Celery.
CELERY_CONFIG = {
    "broker_url": REDIS_URL,
    "result_backend": REDIS_URL,
    "include": [
        "src.tasks.dataset_full_ml_process"
    ],
}

正如 docker-flask-example 中的 Nickjj 所建议的那样 Worker 的 docker-compose.yml 部分是这样的:

  worker:
<<: *default-app
command: celery -A "src.app.celery_app" worker -l "${CELERY_LOG_LEVEL:-debug}"
entrypoint: []
deploy:
  resources:
    limits:
      cpus: "${DOCKER_WORKER_CPUS:-0}"
      memory: "${DOCKER_WORKER_MEMORY:-0}"
profiles: ["worker"]

PS:当我请求任务 process_ml_model_dataset 且 n_factor == 1 时,一切正常,女巫只创建一个任务的组任务。

python docker flask redis celery
1个回答
0
投票

让一个任务等待另一个任务的结果确实是 效率低下,如果工作池不可用甚至可能导致死锁 筋疲力尽。

您可以在此处阅读更多相关信息。

你的

process_ml_model_dataset
正在等待其他celery任务的结果(一堆
process_ml_model_dataset_factor_x
)。当
process_ml_model_dataset
运行时,它正在等待其他任务,并“占用”该工作人员,因此,其他任务无法运行(如果您没有足够的空闲工作人员)。

  1. 您能做的最好的事情就是从您的
    result_group
    服务器检查
    Flask
    状态。
  2. 如果您想继续执行 Celery 任务,您可以使用
    retry
    机制。这样,任务将尝试获取结果,如果还没有准备好,它将停止运行并在 X 秒内重试(您想要的重试次数)。 这样,工作人员将可以自由地处理您希望的任何子任务。只需运行流程并存储
    result_group
    的 UUID。然后在新任务中使用它,看看它是否准备好。

这是一个伪代码:

@celery.task(bind=True, ignore_result=False, max_retries=10)
def process_ml_model_dataset_status(self, task_id):
    num_retries = int(self.request.retries)
    if num_retries == 0:
        # optional: do your thing only in the first run if needed
        
    result = celery.AsyncResult(task_id)
    if not result.ready():
        raise self.retry(countdown=60). # retry in 60 seconds
    return result.get()
© www.soinside.com 2019 - 2024. All rights reserved.