我正在尝试实现一种功能,使我拥有的侦听器类可以订阅/取消订阅JMS主题。经过一番研究,似乎没有明确的方法可以做到这一点,我提出了两种解决方案:
jmsTemplate.receiveAndConvert(topicName)
(可能将阻塞操作本身委派给工作池)。订阅/取消订阅就像从列表中删除主题名称一样简单。具有一个工厂类,该类将使用以下方法为应用程序需要订阅的每个主题构建一个新的侦听器:
public MessageListenerContainer createListener(String topic) {
DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setDestinationName(topic);
container.setMessageListener(new MyListenerClass());
return container;
}
第二种选择对我来说似乎更优雅,但是我不确定听众的生命周期。我浏览了一些有关Spring Boot的jms和activemq模块的资源,并注意到DefaultMessageListenerContainer
具有方法initialize()
和start()
,尽管我不确定如何/是否需要使用它们,这是我唯一的方法找到以这种方式构建的MessageListenerContainer
作为Bean
声明。另外,在退订某个主题时,因此想要销毁与之关联的侦听器容器,除了调用stop(callback)
方法之外,还需要做更多的事情吗?
我对JMS / ActiveMQ及其Spring集成的理解是否正确,因为没有更简单的方法可以实现这一目标?我的方法正确吗?
只要您,恕我直言
PooledConnectionFactory
一个]initialise()
和start()
,并取消订阅stop()
使用第二种方法,一切都应该很好
要在运行时注册新的JmsListenerEndpoint
,您必须完成2个步骤
MessageListener
服务@Service
public class CustomMessageListener implements MessageListener {
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("[Custom message listener] " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Service
public class MessageListenersService {
@Autowired
private JmsListenerEndpointRegistry registry;
@Autowired
@Qualifier("containerFactory")
private DefaultJmsListenerContainerFactory factory;
public void registerEndpoint(String queueNameToListen, MessageListener listener) {
SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
endpoint.setId("ep-42"); // ID is mandatory
endpoint.setMessageListener(listener);
endpoint.setDestination(queueNameToListen);
registry.registerListenerContainer(endpoint, factory, true);
}
}
private static final String CUSTOM_DESTINATION = "queue-42";
@Autowired
MessageListenersService messageListenersService;
@Autowired
CustomMessageListener customMessageListener;
@Autowired
JmsTemplate jmsTemplate;
@PostConstruct
public void createCustomListener() throws InterruptedException {
messageListenersService.registerEndpoint(CUSTOM_DESTINATION, customMessageListener);
jmsTemplate.send(CUSTOM_DESTINATION, session -> session.createTextMessage("hello world"));
}