Celery 队列同时获取多条消息

问题描述 投票:0回答:3

我正在开发一个基于 docker、由 Celery 支持的 Python 应用程序,其中功能之一是触发并发送给定数字的文本消息。工作流程如下:

  1. 用户上传包含一组条目的 CSV
  2. cron 作业每 60 秒轮询一次数据库以获取任何新条目并将它们添加到队列中
  3. 如果发现新条目,将其放入队列并触发短信

目前,如果我上传包含 3 个条目的 CSV 文件,则每个短信操作都会按顺序触发,而不是并行触发(默认 celery 进程行为)。例如,如果调度程序每 10 秒从队列中获取一个作业,则发送 3 条短信所需的时间将为 30 秒。 由于这些作业是相互独立的,我想将其并行化,以便同时发送所有三个短信。

我尝试增加队列的并发性,但假设每个线程都会被分配三个消息之一,但它不起作用。我担心我可能缺少一些东西。 是否需要添加一些其他配置才能并行化作业?

运行 celery 队列的命令

celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair

芹菜配置


app = Celery(
    'worker',
    broker=os.environ['CELERY_BROKER'],
    backend=os.environ['RABBITMQ_BACKEND'],
    include=['worker.tasks','worker.schedule']
)



app.conf.update(
    result_expires=3600,
    task_track_started=True,
    worker_prefetch_multiplier = 5
)

app.conf.beat_schedule = {
    "get-message": {
        "task": "worker.schedule.get_new_messages",
        "schedule": 10,
        'options': {'queue' : 'queue1'}
    }
}
python docker rabbitmq celery
3个回答
0
投票

您可以使用 celery 中的组

组用于并行执行任务。组函数接受签名列表。

可能对参考有帮助,请参阅下面的文档

https://sayari3.com/articles/18-chains-groups-and-chords-in-celery/


0
投票

解决此问题的最佳方法是修复 cron 作业。在您的 cron 作业中,不是将消息放入队列,而是调用您的 celery 任务。当前按顺序处理消息的原因取决于您对

worker.schedule.get_new_messages
的实现。最有可能的是,该函数一次从队列中拉出多条消息,而处理这些消息的函数一次只处理一条消息。

解决这个问题的方法是创建一个发送一条消息的任务,仅此而已。例如:

@app.task('send_my_cool_message')
def send_sms_message(from_, to_, text):
    twilio_client = Client(settings.ACCOUNT_SID, settings.AUTH_TOKEN)
    twilio_client.messages.create(to=to_, from=from_, body=text)

现在在您的 cron 作业中,您为每条消息调用一个 celery 任务:

from celery import Celery
qs = Messages.objects.filter(created_at__gte=last_date_polled)
app = Celery(broker=settings.BROKER_URL, backend=settings.BACKEND_URL)
for message in qs:
    app.send_task('send_my_cool_message', kwargs={
        'from_': message.from_,
        'to_': message.to_,
        'text': message.text,
    })

0
投票

我知道这是一个老问题,但它可能会帮助处于相同情况的其他人。

我也面临同样的问题,只是我的应用程序通过“post/json”向电子邮件云网络服务发送电子邮件(而不是短信)。我正在寻找一种方法从 RabbitMQ 获取“X”个任务,使用 asyncio 并行处理它们,然后通知 RabbitMQ 有关每个任务的信息...我发现您需要使用 greenlet 或线程来指示celery Worker 使用异步方式以异步方式处理任务。

例如,在最初的帖子中,用于运行 celery Worker 的命令是:

celery worker --app=worker.app --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair

需要将要使用的池类型添加到celery工作池类型中(默认为:prefork),例如:

celery worker --app=worker.app --pool=gevent --concurrency=5 --hostname=worker1@%h --loglevel=INFO-Q queue1 -Ofair

请注意,您需要先安装“gevent”才能使用它。

您也可以使用“线程”,但我更喜欢“gevent”,因为我发现它比使用操作系统线程子系统更有效。

更多信息可以在这些链接上找到:

1- https://docs.celeryq.dev/en/stable/userguide/concurrency/gevent.html

2- https://github.com/celery/celery/tree/main/examples/gevent

我还发现以下 YouTube 视频非常有用:

https://www.youtube.com/watch?v=xZ3kNS_G6vs

希望这有帮助!

© www.soinside.com 2019 - 2024. All rights reserved.