如何在celery任务中同步运行多个celery任务?

问题描述 投票:0回答:1

我必须设计两个任务,即任务A和任务B,用于通过API调用获取数据,这两个任务应该能够按计划每天在后台运行,但也可以根据需要从前端单独触发。现在,这两个任务都使用来自同一提供商的 API,该提供商具有 API 速率限制,因此我决定创建名为 task_main() 的新 celery 任务,该任务将同步触发任务 A 和任务 B 以最大程度地减少并发连接,以便全局令牌桶 API 速率限制器不会获得更多负载。 请帮助我如何实现这一目标。我还使用了芹菜链和部分,但在使用链和回调时,我的第一个任务结果自动传递给我不喜欢的第二个任务,因为两个任务的参数都是固定的,并且始终假设单个参数,因此可以单独触发。 我们将非常感谢您的帮助。

@app.task
def taskA(last_update):
    pass

@app.task
def taskB(last_update):
    pass


@app.task
def taskMain():
    task_chain = chain(taskA.s(last_update), taskB.s(last_update))
    chian_result = task_chain.delay()
celery
1个回答
2
投票

还可以使用Celery的链在单个Celery任务中同步执行taskA和taskB。 每天使用调度程序在后台运行,但也可以从前端单独触发

from celery import Celery, chain

# Initialize Celery
app = Celery('tasks', broker='redis://localhost:6379/0')

# Define taskA and taskB
@app.task
def taskA(last_update):
    # Your code for taskA fetching data via API calls
    pass

@app.task
def taskB(last_update):
    # Your code for taskB fetching data via API calls
    pass

# Define taskMain
@app.task
def taskMain(last_update=None):
    # Trigger taskA and taskB synchronously
    task_chain = chain(taskA.s(last_update), taskB.s(last_update))
    chain_result = task_chain.delay()
    return chain_result

# Define a scheduler (Celery Beat) to run taskMain daily
app.conf.beat_schedule = {
    'run-taskMain-daily': {
        'task': 'path.to.taskMain',  # Specify the path to taskMain
        'schedule': 86400,  # Run every 24 hours (24 hours * 60 minutes * 60 seconds)
    },
}
© www.soinside.com 2019 - 2024. All rights reserved.