Celery Chord 在 Python 中如何工作?

问题描述 投票:0回答:1

我有以下代码:

def create_pipeline(
    real: DataFrame,
    forecast_name: str,
    energy_asset_id: int,
    family: str,
    forecast_range: date_range,
    methodologies: List[str],
    start_forecast: datetime,
    subfamily_x_forecast_id: int,
    uuid: str,
    subfamily_id: int,
    execution_id: int,
) -> None:
    forecast_tasks = []
    for methodology in methodologies:
        log.info(f"Methodology: {methodology}")
        forecast_task = celery_app.signature(
            PATHS.get(methodology.lower(), "app.worker.classic.forecast"),
            args=[
                real,
                forecast_name,
                energy_asset_id,
                family,
                methodology,
                forecast_range,
                start_forecast,
                uuid,
                subfamily_id,
            ],
        )
        forecast_tasks.append(forecast_task)

    combination_task = celery_app.signature(
        "app.worker.combinations.pipeline_combinations",
        args=[
            energy_asset_id,
            family,
            methodologies,
            forecast_name,
            subfamily_x_forecast_id,
            uuid,
            start_forecast,
            execution_id,
        ],
        queue=f"combinations-{settings.ENV}"
    )
    group_forecast_task = chord(forecast_tasks)
    group_forecast_task(combination_task)

总共有 8 种方法,因此 Forecast_tasks 是一个包含 8 个任务的列表。 我需要的是执行这 8 个任务,一旦 8 个任务完成,就会执行组合任务任务(添加 group_forecast_task 的结果作为第一个参数),为此我实现了以下部分:

group_forecast_task = chord(forecast_tasks)
group_forecast_task(combination_task)

我最近在另一个项目中像这样实现了它,但现在它对我不起作用! 8 个预测任务已执行,但当它们完成时,combination_task 从未被调用,可能会发生什么?我不太确定和弦实际上是如何工作的以及我是否正确实现了它。

python celery celery-task
1个回答
0
投票

考虑到您没有提到工作人员抛出的任何异常,我认为一切都按预期“工作”。

根据上述假设,这是我的第一个猜测 - 您没有工作人员订阅

combinations-{settings.ENV}
队列,因此没有工作人员来运行该任务。这是一个很常见的错误。

当我启动工作程序时,我通常会在 CLI 中传递队列,例如:

celery -A my.app worker -Q queueA,queueB -O fair ...etc

确保您有

-Q combinations-{settings.ENV}
(将 {settings.ENV} 替换为环境的实际名称)。如果你将任务发送到默认队列并期望该工作人员拿起它们并执行,那么你需要显式地将
celery
添加到队列列表中,在这种特殊情况下它会变成
-Q combinations-{settings.ENV},celery
(初学者经常犯这个错误以及)。

© www.soinside.com 2019 - 2024. All rights reserved.