如何在 Spring Boot 中启动/停止@JmsListener

问题描述 投票:0回答:1

我正在尝试启动/停止从 JMS 队列检索消息。我正在尝试使用

JmsListenerEndpointRegistry
访问
stop()
start()
方法,但我无法使其工作。 以下是该问题的相关类:

配置:

@Configuration
@EnableJms
public class JmsConfig {

    @Value("${spring.activemq.broker-url}")
    String BROKER_URL_1;
    @Value("${spring.activemq.user}")
    String BROKER_USERNAME_1;
    @Value("${spring.activemq.password}")
    String BROKER_PASSWORD_1;

    @Value("${spring.activemq.broker-url-2}")
    String BROKER_URL_2;
    @Value("${spring.activemq.user-2}")
    String BROKER_USERNAME_2;
    @Value("${spring.activemq.password-2}")
    String BROKER_PASSWORD_2;



    @Bean
    public JmsListenerEndpointRegistry jmsListenerEndpointRegistry() {
        return new JmsListenerEndpointRegistry();
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory1(){
        ActiveMQConnectionFactory connectionFactory = new  ActiveMQConnectionFactory();
        connectionFactory.setTrustAllPackages(true);
        connectionFactory.setBrokerURL(BROKER_URL_1);
        connectionFactory.setPassword(BROKER_USERNAME_1);
        connectionFactory.setUserName(BROKER_PASSWORD_1);
        return connectionFactory;
    }

    @Bean
    public ActiveMQConnectionFactory connectionFactory2(){
        ActiveMQConnectionFactory connectionFactory = new  ActiveMQConnectionFactory();
        connectionFactory.setTrustAllPackages(true);
        connectionFactory.setBrokerURL(BROKER_URL_2);
        connectionFactory.setPassword(BROKER_USERNAME_2);
        connectionFactory.setUserName(BROKER_PASSWORD_2);
        return connectionFactory;
    }

    @Bean
    public JmsTemplate jmsTemplate1() {
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory1());
        template.setMessageConverter(messageConverter());
        template.setPubSubDomain(true);
        template.setDestinationResolver(destinationResolver());
        template.setDeliveryPersistent(true);
        return template;
    }

    @Bean
    public JmsTemplate jmsTemplate2(){
        JmsTemplate template = new JmsTemplate();
        template.setConnectionFactory(connectionFactory2());
        template.setMessageConverter(messageConverter());
        template.setPubSubDomain(true);
        template.setDestinationResolver(destinationResolver());
        template.setDeliveryPersistent(true);
        return template;
    }

    @Bean
    public MessageConverter messageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setObjectMapper(objectMapper());
        return converter;
    }

    @Bean
    public ObjectMapper objectMapper() {
        ObjectMapper mapper = new ObjectMapper();
        mapper.registerModule(new JavaTimeModule());
        mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
        return mapper;
    }

    @Bean(name = "jmsListenerContainerFactory")
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory1());
        return factory;
    }

    @Bean(name = "jmsListenerContainerFactory2")
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory2() {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory2());
        return factory;
    }

    @Bean
    DynamicDestinationResolver destinationResolver() {
        return new DynamicDestinationResolver() {
            @Override
            public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
                if(destinationName.endsWith("Topic")) {
                    pubSubDomain = true;
                }
                else {
                    pubSubDomain = false;
                }
                return super.resolveDestinationName(session,destinationName,pubSubDomain);
            }
        };
    }

}

听众:

@Component
@RequiredArgsConstructor
public class ReceiverService{

    @Value("${spring.activemq.queue}")
    private String queueName;
    @Value("${spring.activemq.queue-2}")
    private String queueName2;

    private final MessageService messageService;
    private final JmsListenerManager jmsListenerManager;
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3000);
    private final Set<String> receivedMessages1 = new HashSet<>();
    private final Set<String> receivedMessages2 = new HashSet<>();

    @JmsListener(destination = "${spring.activemq.queue}", id = "receiver1", containerFactory = "jmsListenerContainerFactory")
    public void receive(final Message jsonMessage) throws JMSException {
        String messageBody = jsonMessage.getBody(String.class);
        receivedMessages1.add(messageBody);
        System.out.println("Received message from Queue 1: " + messageBody);
    }

    @JmsListener(destination = "${spring.activemq.queue-2}", id = "receiver2",containerFactory = "jmsListenerContainerFactory2")
    public void receive2(final Message message) throws JMSException {
        String messageBody = message.getBody(String.class);
        receivedMessages2.add(messageBody);
        System.out.println("Received message from Queue 2: " + messageBody);
    }

听众经理:

@Component
public class JmsListenerManager {

    private final JmsListenerEndpointRegistry registry;

    @Autowired
    public JmsListenerManager(@Qualifier("jmsListenerEndpointRegistry") JmsListenerEndpointRegistry endpointRegistry) {
        this.registry = endpointRegistry;
    }

        public void stopAll() {
            MessageListenerContainer listenerContainer = registry.getListenerContainer("receiver1");
            listenerContainer.stop();
        }

        public void startAll() {
            registry.start();
        }
}

stop 方法抛出以下异常。

java.lang.NullPointerException: Cannot invoke "org.springframework.jms.listener.MessageListenerContainer.stop()" because "listenerContainer" is null
    at com.nextbyn.api.service.activemq.JmsListenerManager.stopAll(JmsListenerManager.java:21) ~[classes/:na]

知道我该如何解决吗?

java spring-jms
1个回答
0
投票

我刚刚找到了解决方案,问题是它没有正确引用必要的bean,这是我的解决方案。

@Component
public class JmsListenerManager {

    @Autowired
    ApplicationContext context;

    public void stopAll() {
        JmsListenerEndpointRegistry customRegistry = context.getBean("org.springframework.jms.config.internalJmsListenerEndpointRegistry",JmsListenerEndpointRegistry.class);
        customRegistry.stop();
        System.out.println("Stopped all listeners");
    }

    public void startAll() {
        JmsListenerEndpointRegistry customRegistry = context.getBean("org.springframework.jms.config.internalJmsListenerEndpointRegistry",JmsListenerEndpointRegistry.class);
        customRegistry.start();
        System.out.println("Started all listeners");
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.