如何在python中使用pika(RabbitMQ)向消费者添加多处理

问题描述 投票:1回答:1

我在python中使用pika框架编写了非常基本的生产者 - 消费者代码。问题是 - 消费者端队列中的消息运行速度太慢。我运行了一些测试,发现我可以通过多处理将工作流程加速到27倍。问题是 - 我不知道在我的代码中添加多处理功能的正确方法是什么。

import pika
import json
from datetime import datetime
from functions import download_xmls


def callback(ch, method, properties, body):
    print('Got something')
    body = json.loads(body)
    type = body[-1]['Type']
    print('Object type in work currently ' + type)
    cnums = [x['cadnum'] for x in body[:-1]]
    print('Got {} cnums to work with'.format(len(cnums)))

    date_start = datetime.now()
    download_xmls(type,cnums)
    date_end = datetime.now()
    ch.basic_ack(delivery_tag=method.delivery_tag)
    print('Download complete in {} seconds'.format((date_end-date_start).total_seconds()))


def consume(queue_name = 'bot-test'):
    parameters = pika.URLParameters('server@address')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    channel.queue_declare(queue=queue_name, durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback, queue='bot-test')
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

如何从这里开始添加多处理功能?

python rabbitmq multiprocessing pika
1个回答
0
投票

Pika有广泛的example code,我建议你看看。请注意,此代码仅供使用。在线程上工作的情况下,您将不得不使用更智能的方式来管理线程。

目标是不阻止运行Pika的IO循环的线程,并从您的工作线程正确回调IO循环。这就是add_callback_threadsafe存在并用于该代码的原因。


注意:RabbitMQ团队监控rabbitmq-users mailing list,有时只回答StackOverflow上的问题。

© www.soinside.com 2019 - 2024. All rights reserved.