为什么使用 Pika RabbitMQ,消费者在长时间空闲时会默默地断开队列?

问题描述 投票:0回答:0

在我的公司,我们目前正面临使用 rabbitMQ 的奇怪行为。我们在 Kubernetes 集群中托管的一个微服务是用 Python 构建的(其他的在 Java 中,它们没有显示这个问题)并使用 RabbitMQ 的 Pika 实现进行通信。

在一周内微服务工作得很好,在周末它没有被使用并且仍然在运行。在星期一尝试在此微服务队列上发布消息会导致它们保持未确认状态,因为所有消费者都已断开连接。 查看代码日志,绝对什么都没有:没有错误,没有抛出异常,什么都没有。 服务保持空闲而没有注意到它与队列断开连接.

在第一个实现中,我们有一个 BlockingConnection,但是参考这个问题 https://github.com/pika/pika/issues/877 并怀疑某种空闲终止,我们使用 ioloops 移动到 SelectConnection . heartbeat 配置了标准的 60s 和 broker negotiation。 我认为这在 rabbitMQ 代理中不是问题,因为 Java 中的所有其他微服务都工作得很好并且连接保持存在数周。

我发现的一个非常相似的问题是 Rabbitmq 在 ioloop 保持活动状态时断开与消费者的连接,发行者通过恢复他禁用的心跳来解决它,但对于我们的用例它已经启用了。

这里是 github 的链接,其中包含我们正在使用的消费者的紧密实现:https://github.com/stefanofal-altilia/idle-disconnecting-rabbitmq/blob/main/consumer.py.

可以在下面找到相同的代码。 使用鼠兔 1.2.0

import pika
from pika import ConnectionParameters


def handle(ch, method, properties, body):
    try:
        print("Method called handle")
        print("Message with id {} arrived".format(properties.correlation_id))
        body = "I recevied a message!"
        ch.basic_publish(exchange='',
                         routing_key=properties.reply_to,
                         properties=pika.BasicProperties(correlation_id=properties.correlation_id),
                         body=str.encode(body))
        print("Response sended")
        ch.basic_ack(delivery_tag=method.delivery_tag)
        print("Method ended handle")
    except BaseException as e:
        print(
            "Fatal error on message broker class {} method {} error {}".format("RabbitMQConsumer", "handle",
                                                                               str(e)))


def consume_select_connection():
    print("Method called consume_select_connection")

    def on_open(conn):
        print("Method called on_open")
        conn.channel(on_open_callback=on_channel_open)
        print("Method ended on_open")

    def on_channel_open(channel):
        print("Method called on_channel_open")
        channel.queue_declare("annotation-request-queue", passive=False, durable=True,
                              exclusive=False, auto_delete=False)
        channel.basic_consume("annotation-request-queue", on_message_callback=handle)
        print("Method ended on_channel_open")

    credentials = pika.PlainCredentials(username="guest",
                                        password="guest")
    parameters = ConnectionParameters(
        host="localhost",
        port=5672,
        credentials=credentials,
        blocked_connection_timeout=60)
    connection = pika.SelectConnection(parameters=parameters, on_open_callback=on_open)
    try:
        print("I/O start")
        #  Block on the IOLoop
        connection.ioloop.start()

    # Catch a Keyboard Interrupt to make sure that the connection is closed cleanly
    except KeyboardInterrupt:
        print("keyboard exception")
        # Gracefully close the connection
        connection.close()
    except BaseException as e:
        print(("BASE exception: ", str(e)))


if __name__ == '__main__':
    consume_select_connection()

rabbitmq pika messagebroker
© www.soinside.com 2019 - 2024. All rights reserved.