我正在尝试启动/停止从 JMS 队列检索消息。我正在尝试使用
JmsListenerEndpointRegistry
访问 stop()
和 start()
方法,但我无法使其工作。
以下是该问题的相关类:
配置:
@Configuration
@EnableJms
public class JmsConfig {
@Value("${spring.activemq.broker-url}")
String BROKER_URL_1;
@Value("${spring.activemq.user}")
String BROKER_USERNAME_1;
@Value("${spring.activemq.password}")
String BROKER_PASSWORD_1;
@Value("${spring.activemq.broker-url-2}")
String BROKER_URL_2;
@Value("${spring.activemq.user-2}")
String BROKER_USERNAME_2;
@Value("${spring.activemq.password-2}")
String BROKER_PASSWORD_2;
@Bean
public JmsListenerEndpointRegistry jmsListenerEndpointRegistry() {
return new JmsListenerEndpointRegistry();
}
@Bean
public ActiveMQConnectionFactory connectionFactory1(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setTrustAllPackages(true);
connectionFactory.setBrokerURL(BROKER_URL_1);
connectionFactory.setPassword(BROKER_USERNAME_1);
connectionFactory.setUserName(BROKER_PASSWORD_1);
return connectionFactory;
}
@Bean
public ActiveMQConnectionFactory connectionFactory2(){
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setTrustAllPackages(true);
connectionFactory.setBrokerURL(BROKER_URL_2);
connectionFactory.setPassword(BROKER_USERNAME_2);
connectionFactory.setUserName(BROKER_PASSWORD_2);
return connectionFactory;
}
@Bean
public JmsTemplate jmsTemplate1() {
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory1());
template.setMessageConverter(messageConverter());
template.setPubSubDomain(true);
template.setDestinationResolver(destinationResolver());
template.setDeliveryPersistent(true);
return template;
}
@Bean
public JmsTemplate jmsTemplate2(){
JmsTemplate template = new JmsTemplate();
template.setConnectionFactory(connectionFactory2());
template.setMessageConverter(messageConverter());
template.setPubSubDomain(true);
template.setDestinationResolver(destinationResolver());
template.setDeliveryPersistent(true);
return template;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setObjectMapper(objectMapper());
return converter;
}
@Bean
public ObjectMapper objectMapper() {
ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
@Bean(name = "jmsListenerContainerFactory")
public JmsListenerContainerFactory<?> jmsListenerContainerFactory() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory1());
return factory;
}
@Bean(name = "jmsListenerContainerFactory2")
public JmsListenerContainerFactory<?> jmsListenerContainerFactory2() {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory2());
return factory;
}
@Bean
DynamicDestinationResolver destinationResolver() {
return new DynamicDestinationResolver() {
@Override
public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException {
if(destinationName.endsWith("Topic")) {
pubSubDomain = true;
}
else {
pubSubDomain = false;
}
return super.resolveDestinationName(session,destinationName,pubSubDomain);
}
};
}
}
听众:
@Component
@RequiredArgsConstructor
public class ReceiverService{
@Value("${spring.activemq.queue}")
private String queueName;
@Value("${spring.activemq.queue-2}")
private String queueName2;
private final MessageService messageService;
private final JmsListenerManager jmsListenerManager;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(3000);
private final Set<String> receivedMessages1 = new HashSet<>();
private final Set<String> receivedMessages2 = new HashSet<>();
@JmsListener(destination = "${spring.activemq.queue}", id = "receiver1", containerFactory = "jmsListenerContainerFactory")
public void receive(final Message jsonMessage) throws JMSException {
String messageBody = jsonMessage.getBody(String.class);
receivedMessages1.add(messageBody);
System.out.println("Received message from Queue 1: " + messageBody);
}
@JmsListener(destination = "${spring.activemq.queue-2}", id = "receiver2",containerFactory = "jmsListenerContainerFactory2")
public void receive2(final Message message) throws JMSException {
String messageBody = message.getBody(String.class);
receivedMessages2.add(messageBody);
System.out.println("Received message from Queue 2: " + messageBody);
}
听众经理:
@Component
public class JmsListenerManager {
private final JmsListenerEndpointRegistry registry;
@Autowired
public JmsListenerManager(@Qualifier("jmsListenerEndpointRegistry") JmsListenerEndpointRegistry endpointRegistry) {
this.registry = endpointRegistry;
}
public void stopAll() {
MessageListenerContainer listenerContainer = registry.getListenerContainer("receiver1");
listenerContainer.stop();
}
public void startAll() {
registry.start();
}
}
stop 方法抛出以下异常。
java.lang.NullPointerException: Cannot invoke "org.springframework.jms.listener.MessageListenerContainer.stop()" because "listenerContainer" is null
at com.nextbyn.api.service.activemq.JmsListenerManager.stopAll(JmsListenerManager.java:21) ~[classes/:na]
知道我该如何解决吗?
我刚刚找到了解决方案,问题是它没有正确引用必要的bean,这是我的解决方案。
@Component
public class JmsListenerManager {
@Autowired
ApplicationContext context;
public void stopAll() {
JmsListenerEndpointRegistry customRegistry = context.getBean("org.springframework.jms.config.internalJmsListenerEndpointRegistry",JmsListenerEndpointRegistry.class);
customRegistry.stop();
System.out.println("Stopped all listeners");
}
public void startAll() {
JmsListenerEndpointRegistry customRegistry = context.getBean("org.springframework.jms.config.internalJmsListenerEndpointRegistry",JmsListenerEndpointRegistry.class);
customRegistry.start();
System.out.println("Started all listeners");
}
}