队列最大长度或TTL使用get但不消耗

问题描述 投票:1回答:1

我看到许多帖子询问限制队列长度。在我对Pika和RabbitMQ的实验中,如果我使用arguments={'x-message-ttl': 1000, 'x-max-length': 2, 'x-overflow': 'drop-head'}声明队列,甚至在生成消息属性时将expiration='1000'添加到消息属性中,我可以看到所有这三个都有助于从队列中删除消息。我的目标是确保消费者只收到最新信息。

但是,正如这里指出的那样:RabbitMQ messages delivered after x-message-ttl has expired,我只能使用basic_get而不是basic_consume来使用它。

basic_get似乎拉消息,每次发送一个请求。我需要能够等待服务器推送消息,而不是轮询它。消费者不是正确的选择吗?消费者有什么要求利用x-message-ttlx-max-length(我试过basic_qos(prefetch_count=1))?

rabbitmq amqp pika
1个回答
1
投票

问题是autoAck在您链接的代码中设置为True。如果你将autoAck设置为True,那么当消息在缓冲区中时,它就会被视为已接收,并且RabbitMQ将向您发送新消息。

这基本上意味着你所有的Thread.sleep(300)都会延迟消息的显示时间。消息仍在接收并放入缓冲区,更糟糕的是,如果应用程序在缓冲区中仍有消息时崩溃,则缓冲区中的所有消息都将丢失。

如果你在另一方面关闭autoAck,你有prefetch_count设置为1。在第一条消息被标记为已确认之前,RabbitMQ不会向消费者发送新消息。

channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.out.println(Thread.currentThread().toString() + " - " + new String(body));
            try {
                Thread.sleep(300);
            } finally {
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        }
    });

这速度要慢得多,但它也更安全,并且确保您只收到可以处理的消息。

© www.soinside.com 2019 - 2024. All rights reserved.