我想用Python编写一个守护进程,它定期唤醒以处理RabbitMQ队列中排队的一些数据。
当守护进程醒来时,它应该消耗队列中的所有消息(或
min(len(queue), N)
,其中N是任意数字),因为批量处理数据更好。有没有办法在 pika
中执行此操作,而不是传递在每次消息到达时调用的回调?
谢谢。
这是使用 pika 编写的代码。可以使用
basic.get
编写类似的函数
下面的代码将利用
channel.consume
开始消费消息。当达到所需的消息数量时,我们会中断/停止。
我设置了一个
batch_size
来防止一次拉取大量消息。您可以随时更改 batch_size
以满足您的需求。
from pika import BasicProperties, URLParameters
from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection
from pika.exceptions import ChannelWrongStateError, StreamLostError, AMQPConnectionError
from pika.exchange_type import ExchangeType
import json
def consume_messages(queue_name: str):
msgs = list([])
batch_size = 500
q = channel.queue_declare(queue_name, durable=True, exclusive=False, auto_delete=False)
q_length = q.method.message_count
if not q_length:
return msgs
msgs_limit = batch_size if q_length > batch_size else q_length
try:
# Get messages and break out
for method_frame, properties, body in channel.consume(queue_name):
# Append the message
try:
msgs.append(json.loads(bytes.decode(body)))
except:
logger.info(f"Rabbit Consumer : Received message in wrong format {str(body)}")
# Acknowledge the message
channel.basic_ack(method_frame.delivery_tag)
# Escape out of the loop when desired msgs are fetched
if method_frame.delivery_tag == msgs_limit:
# Cancel the consumer and return any pending messages
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
break
except (ChannelWrongStateError, StreamLostError, AMQPConnectionError) as e:
logger.info(f'Connection Interrupted: {str(e)}')
finally:
# Close the channel and the connection
channel.stop_consuming()
channel.close()
return msgs
您可以使用 basic.get API,它从代理中提取消息,而不是订阅要推送的消息