我有以下代码:
def create_pipeline(
real: DataFrame,
forecast_name: str,
energy_asset_id: int,
family: str,
forecast_range: date_range,
methodologies: List[str],
start_forecast: datetime,
subfamily_x_forecast_id: int,
uuid: str,
subfamily_id: int,
execution_id: int,
) -> None:
forecast_tasks = []
for methodology in methodologies:
log.info(f"Methodology: {methodology}")
forecast_task = celery_app.signature(
PATHS.get(methodology.lower(), "app.worker.classic.forecast"),
args=[
real,
forecast_name,
energy_asset_id,
family,
methodology,
forecast_range,
start_forecast,
uuid,
subfamily_id,
],
)
forecast_tasks.append(forecast_task)
combination_task = celery_app.signature(
"app.worker.combinations.pipeline_combinations",
args=[
energy_asset_id,
family,
methodologies,
forecast_name,
subfamily_x_forecast_id,
uuid,
start_forecast,
execution_id,
],
queue=f"combinations-{settings.ENV}"
)
group_forecast_task = chord(forecast_tasks)
group_forecast_task(combination_task)
总共有 8 种方法,因此 Forecast_tasks 是一个包含 8 个任务的列表。 我需要的是执行这 8 个任务,一旦 8 个任务完成,就会执行组合任务任务(添加 group_forecast_task 的结果作为第一个参数),为此我实现了以下部分:
group_forecast_task = chord(forecast_tasks)
group_forecast_task(combination_task)
我最近在另一个项目中像这样实现了它,但现在它对我不起作用! 8 个预测任务已执行,但当它们完成时,combination_task 从未被调用,可能会发生什么?我不太确定和弦实际上是如何工作的以及我是否正确实现了它。
考虑到您没有提到工作人员抛出的任何异常,我认为一切都按预期“工作”。
根据上述假设,这是我的第一个猜测 - 您没有工作人员订阅
combinations-{settings.ENV}
队列,因此没有工作人员来运行该任务。这是一个很常见的错误。
当我启动工作程序时,我通常会在 CLI 中传递队列,例如:
celery -A my.app worker -Q queueA,queueB -O fair ...etc
确保您有
-Q combinations-{settings.ENV}
(将 {settings.ENV} 替换为环境的实际名称)。如果你将任务发送到默认队列并期望该工作人员拿起它们并执行,那么你需要显式地将 celery
添加到队列列表中,在这种特殊情况下它会变成 -Q combinations-{settings.ENV},celery
(初学者经常犯这个错误以及)。