我在这里有些摸不着头脑。我需要使应用程序能够拥有一个队列的单个消费者。所以我的第一直觉反应是这样做:
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(jsonConverter());
factory.setConcurrentConsumers(1);
factory.setMaxConcurrentConsumers(1);
return factory;
}
当我向队列发送一些消息进行测试时,我确实注意到一次只有一个侦听器处于活动状态,但我也在日志中注意到了这一点:
2024-02-29T16:50:54.338-05:00 DEBUG 49772 --- [pool-2-thread-9] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '38' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=30
2024-02-29T16:50:54.466-05:00 DEBUG 49772 --- [ool-2-thread-10] o.s.a.r.listener.BlockingQueueConsumer : Storing delivery for consumerTag: 'amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q' with deliveryTag: '39' in Consumer@25de8898: tags=[[amq.ctag-EM8Q3ba8sEeww7wnKOwY3Q]], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/,1), conn: Proxy@ad0bb4e Shared Rabbit Connection: SimpleConnection@19705650 [delegate=amqp://[email protected]:5672/, localPort=65348], acknowledgeMode=MANUAL local queue size=31
在查看 Rabbit 控制台时,我还注意到消息处于未确认状态,但尚未准备好。
所以我猜
BlockingQueueConsumer
确实将它们从队列中删除,并在内部控制并发。有没有一种方法可以使用注释实际上强制一次只提取一条消息?或者我是否必须切换到手动投票才能实现这一目标?
谢谢大家
为此目的,您不得使用
factory.setMaxConcurrentConsumers(1);
。
保持原样。 AsyncMessageProcessingConsumer.mainLoop()
中有逻辑:
boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
checkAdjust(receivedOk);
}
因此,如果该属性不是
BlockingQueueConsumer
,我们确实会尝试开始一个新的 null
。
您可以在
concurrency
中的 SimpleMessageListenerContainer
属性的 JavaDocs 中找到更多信息。
还有一个可能对您的用例感兴趣:
/**
* Set to true for an exclusive consumer - if true, the concurrency must be 1.
* @param exclusive true for an exclusive consumer.
*/
@Override
public final void setExclusive(boolean exclusive) {
文档中还有一些信息:https://docs.spring.io/spring-amqp/reference/amqp/listener-concurrency.html
您可能也有兴趣将此选项设置为
1
:
/**
* Tell the broker how many messages to send to each consumer in a single request.
* Often this can be set quite high to improve throughput.
* @param prefetchCount the prefetch count
* @see com.rabbitmq.client.Channel#basicQos(int, boolean)
*/
public void setPrefetchCount(int prefetchCount) {