如何使用pika消费RabbitMQ队列中的所有消息

问题描述 投票:0回答:2

我想用Python编写一个守护进程,它定期唤醒以处理RabbitMQ队列中排队的一些数据。

当守护进程醒来时,它应该消耗队列中的所有消息(或

min(len(queue), N)
,其中N是任意数字),因为批量处理数据更好。有没有办法在
pika
中执行此操作,而不是传递在每次消息到达时调用的回调?

谢谢。

rabbitmq pika
2个回答
2
投票

这是使用 pika 编写的代码。可以使用

basic.get

编写类似的函数

下面的代码将利用

channel.consume
开始消费消息。当达到所需的消息数量时,我们会中断/停止。

我设置了一个

batch_size
来防止一次拉取大量消息。您可以随时更改
batch_size
以满足您的需求。

from pika import BasicProperties, URLParameters
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelWrongStateError, StreamLostError, AMQPConnectionError
from pika.exchange_type import ExchangeType
import json

def consume_messages(queue_name: str):
    msgs = list([])
    batch_size = 500

    q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
    q_length = q.method.message_count
    
    if not q_length:
        return msgs

    msgs_limit = batch_size if q_length > batch_size else q_length

    try:
        # Get messages and break out
        for method_frame, properties, body in channel.consume(queue_name):

            # Append the message
            try:
                msgs.append(json.loads(bytes.decode(body)))
            except:
                logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")

            # Acknowledge the message
            channel.basic_ack(method_frame.delivery_tag)

            # Escape out of the loop when desired msgs are fetched
            if method_frame.delivery_tag == msgs_limit:

                # Cancel the consumer and return any pending messages
                requeued_messages = channel.cancel()
                print('Requeued %i messages' % requeued_messages)
                break

    except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
        logger.info(f'Connection Interrupted: {str(e)}')

    finally:
        # Close the channel and the connection
        channel.stop_consuming()
        channel.close()

    return msgs

0
投票

您可以使用 basic.get API,它从代理中提取消息,而不是订阅要推送的消息

© www.soinside.com 2019 - 2024. All rights reserved.