如何使用Spring JmsListener从ActiveMQ手动确认消息

问题描述 投票:0回答:1

我在Spring的JmsListener中使用ActiveMQ(与JMS一起使用)。我可以使用ActiveMQ队列中的消息,但它使用的是AUTO_ACKNOWLEDGE。如何设置CLIENT_ACKNOWLEDGE,以便仅在该确认后才能使用其他消息。

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
    activeMQConnectionFactory.setTransactedIndividualAck(true);
    activeMQConnectionFactory.setUserName(mqUserName);
    activeMQConnectionFactory.setPassword(mqPassword);
    activeMQConnectionFactory.setBrokerURL(mqUrl);
    return activeMQConnectionFactory;
}

@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> {
        logger.info("An error has occurred in the transaction");
        logger.error(t.getCause().getMessage());
    });

    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("4");

    // You could still override some of Boot's default if necessary.
    return factory;
}

@Bean
public MessageConverter jacksonJmsMessageConverter() {
    MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
    converter.setTargetType(MessageType.TEXT);
    converter.setTypeIdPropertyName("_type");
    return converter;
}

@JmsListener(destination = "QUEUE_1", containerFactory = "myFactory", concurrency = "2")
public void receiveImgGenerationMessage(String transaction) {
    logger.info("message received in queue " + transaction);
    //I will call other api to process the message and do some operation 
    //after the message is processed 
    //I have to Acknowledge the message is processed
    //so that i can consume the other message for process.
}

// jmsTemplate bean
public void sendmessage() {
    for (int i =0 ; i < 10 ; < i++) { 
        jmsTemplate.convertAndSend("QUEUE_1", i);
    }
}
java jms activemq spring-jms
1个回答
0
投票

您应该在您的计算机上使用setSessionAcknowledgeMode方法org.springframework.jms.config.DefaultJmsListenerContainerFactory实例,例如:

@Bean
public JmsListenerContainerFactory myFactory(ConnectionFactory connectionFactory, DefaultJmsListenerContainerFactoryConfigurer configurer) {

    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    factory.setErrorHandler(t -> {
        logger.info("An error has occurred in the transaction");
        logger.error(t.getCause().getMessage());
    });

    factory.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);

    configurer.configure(factory, connectionFactory);
    factory.setConcurrency("4");

    // You could still override some of Boot's default if necessary.
    return factory;
}
© www.soinside.com 2019 - 2024. All rights reserved.