我正在开发一个基于 docker、由 Celery 支持的 Python 应用程序,其中功能之一是触发并发送给定数字的文本消息。工作流程如下:
目前,如果我上传包含 3 个条目的 CSV 文件,则每个短信操作都会按顺序触发,而不是并行触发(默认 celery 进程行为)。例如,如果调度程序每 10 秒从队列中获取一个作业,则发送 3 条短信所需的时间将为 30 秒。 由于这些作业是相互独立的,我想将其并行化,以便同时发送所有三个短信。
我尝试增加队列的并发性,但假设每个线程都会被分配三个消息之一,但它不起作用。我担心我可能缺少一些东西。 是否需要添加一些其他配置才能并行化作业?
运行 celery 队列的命令
celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair
芹菜配置
app = Celery(
'worker',
broker=os.environ['CELERY_BROKER'],
backend=os.environ['RABBITMQ_BACKEND'],
include=['worker.tasks','worker.schedule']
)
app.conf.update(
result_expires=3600,
task_track_started=True,
worker_prefetch_multiplier = 5
)
app.conf.beat_schedule = {
"get-message": {
"task": "worker.schedule.get_new_messages",
"schedule": 10,
'options': {'queue' : 'queue1'}
}
}
您可以使用 celery 中的组
组用于并行执行任务。组函数接受签名列表。
可能对参考有帮助,请参阅下面的文档
https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/
解决此问题的最佳方法是修复 cron 作业。在您的 cron 作业中,不是将消息放入队列,而是调用您的 celery 任务。当前按顺序处理消息的原因取决于您对
worker.schedule.get_new_messages
的实现。最有可能的是,该函数一次从队列中拉出多条消息,而处理这些消息的函数一次只处理一条消息。
解决这个问题的方法是创建一个发送一条消息的任务,仅此而已。例如:
@app.task('send_my_cool_message')
def send_sms_message(from_, to_, text):
twilio_client = Client(settings.ACCOUNT_SID, settings.AUTH_TOKEN)
twilio_client.messages.create(to=to_, from=from_, body=text)
现在在您的 cron 作业中,您为每条消息调用一个 celery 任务:
from celery import Celery
qs = Messages.objects.filter(created_at__gte=last_date_polled)
app = Celery(broker=settings.BROKER_URL, backend=settings.BACKEND_URL)
for message in qs:
app.send_task('send_my_cool_message', kwargs={
'from_': message.from_,
'to_': message.to_,
'text': message.text,
})
我知道这是一个老问题,但它可能会帮助处于相同情况的其他人。
我也面临同样的问题,只是我的应用程序通过“post/json”向电子邮件云网络服务发送电子邮件(而不是短信)。我正在寻找一种方法从 RabbitMQ 获取“X”个任务,使用 asyncio 并行处理它们,然后通知 RabbitMQ 有关每个任务的信息...我发现您需要使用 greenlet 或线程来指示celery Worker 使用异步方式以异步方式处理任务。
例如,在最初的帖子中,用于运行 celery Worker 的命令是:
celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair
需要将要使用的池类型添加到celery工作池类型中(默认为:prefork),例如:
celery worker --app=worker.app --pool=gevent --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair
请注意,您需要先安装“gevent”才能使用它。
您也可以使用“线程”,但我更喜欢“gevent”,因为我发现它比使用操作系统线程子系统更有效。
更多信息可以在这些链接上找到:
1- https://docs.celeryq.dev/en/stable/userguide/concurrency/gevent.html
2- https://github.com/celery/celery/tree/main/examples/gevent
我还发现以下 YouTube 视频非常有用:
https://www.youtube.com/watch?v=xZ3kNS_G6vs
希望这有帮助!