我正在尝试使用RabbitMQ topic
交换可靠地从发布者向多个消费者发送消息。
我已经配置了持久队列(每个消费者一个),我发送持久性消息delivery_mode=2
。我也在confim_delivery
模式下设置频道,并添加了mandatory=True
标志发布。
现在,该服务非常可靠,但如果在代理重新启动并且消息发布期间保持关闭状态,则消息会丢失给其中一个消费者。
似乎代理可以在重新启动时恢复队列和消息,但它似乎没有保持消费者和队列之间的绑定。因此,消息只会传达给其中一个消费者,而对于那个消费者来说则会迷失。
注意:如果代理在消费者关闭期间没有重新启动,则消息会到达队列和消费者。它们在队列中正确累积,并在它再次启动时传递给消费者。
编辑 - 添加消费者代码:
import pika
class Consumer(object):
def __init__(self, queue_name):
self.queue_name = queue_name
def consume(self):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.queue_declare(queue=self.queue_name, durable=True)
channel.queue_bind(
exchange='myexchange', queue=self.queue_name, routing_key='my.route')
channel.basic_consume(
consumer_callback=self.message_received, queue=self.queue_name)
channel.start_consuming()
def message_received(self, channel, basic_deliver, properties, body):
print(f'Message received: {body}')
channel.basic_ack(delivery_tag=basic_deliver.delivery_tag)
您可以假设每个客户服务器执行类似于以下操作:
c = Consumer('myuniquequeue') # each consumer has a permanent queue name
c.consume()
修改 - 添加发布商代码:
def publish(message):
credentials = pika.PlainCredentials(
username='myuser', password='mypassword')
connection = pika.BlockingConnection(
pika.ConnectionParameters(host='myhost', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='myexchange', exchange_type='topic')
channel.confirm_delivery()
success = channel.basic_publish(
exchange='myexchange',
routing_key='my.route',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
),
mandatory=True
)
if success:
print("Message sent")
else:
print("Could not send message")
# Save for sending later
值得一提的是,我自己处理错误案例,这不是我想改进的部分。当我的消息丢失给某些消费者时,流程会通过成功部分
在您的消费者回调方法中使用basic.ack(delivery_tag=basic_deliver.delivery_tag)
。此确认告知消费者是否已收到消息并进行了处理。如果是否定确认,则该消息将被重新排队。
编辑#1为了在代理崩溃期间接收消息,需要分发代理。它是RabbitMQ中称为镜像队列的概念。 Mirrored Queues
允许您的队列跨群集中的节点进行复制。如果包含队列的其中一个节点发生故障,则包含该队列的另一个节点将充当您的代理。
有关完整的理解,请参阅此Mirrored Queues