如何强制单个消费者使用注释驱动rabbitmq spring应用程序

问题描述 投票:0回答:1

我在这里有些摸不着头脑。我需要使应用程序能够拥有一个队列的单个消费者。所以我的第一直觉反应是这样做:

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(1);
        return factory;
    }

当我向队列发送一些消息进行测试时,我确实注意到一次只有一个侦听器处于活动状态,但我也在日志中注意到了这一点:

2024-02-29T16:50:54.338-05:00 DEBUG 49772 --- [pool-2-thread-9] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '38' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=30
2024-02-29T16:50:54.466-05:00 DEBUG 49772 --- [ool-2-thread-10] o.s.a.r.listener.BlockingQueueConsumer   : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '39' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=31

在查看 Rabbit 控制台时,我还注意到消息处于未确认状态,但尚未准备好。

所以我猜

BlockingQueueConsumer
确实将它们从队列中删除,并在内部控制并发。有没有一种方法可以使用注释实际上强制一次只提取一条消息?或者我是否必须切换到手动投票才能实现这一目标?

谢谢大家

rabbitmq spring-rabbit
1个回答
0
投票

为此目的,您不得使用

factory.setMaxConcurrentConsumers(1);
。 保持原样。
AsyncMessageProcessingConsumer.mainLoop()
中有逻辑:

boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
    checkAdjust(receivedOk);
}

因此,如果该属性不是

BlockingQueueConsumer
,我们确实会尝试开始一个新的
null

您可以在

concurrency
中的
SimpleMessageListenerContainer
属性的 JavaDocs 中找到更多信息。

还有一个可能对您的用例感兴趣:

/**
 * Set to true for an exclusive consumer - if true, the concurrency must be 1.
 * @param exclusive true for an exclusive consumer.
 */
@Override
public final void setExclusive(boolean exclusive) {

文档中还有一些信息:https://docs.spring.io/spring-amqp/reference/amqp/listener-concurrency.html

您可能也有兴趣将此选项设置为

1
:

/**
 * Tell the broker how many messages to send to each consumer in a single request.
 * Often this can be set quite high to improve throughput.
 * @param prefetchCount the prefetch count
 * @see com.rabbitmq.client.Channel#basicQos(int, boolean)
 */
public void setPrefetchCount(int prefetchCount) {
© www.soinside.com 2019 - 2024. All rights reserved.