Spring Boot + ActiveMQ以编程方式快速订阅主题

问题描述 投票:1回答:2

我正在尝试实现一种功能,使我拥有的侦听器类可以订阅/取消订阅JMS主题。经过一番研究,似乎没有明确的方法可以做到这一点,我提出了两种解决方案:

  1. 具有一个侦听器类,该类包含String主题名称的列表,并定期遍历所有应订阅的主题并在每个主题上运行阻塞jmsTemplate.receiveAndConvert(topicName)(可能将阻塞操作本身委派给工作池)。订阅/取消订阅就像从列表中删除主题名称一样简单。
  2. 具有一个工厂类,该类将使用以下方法为应用程序需要订阅的每个主题构建一个新的侦听器:

    public MessageListenerContainer createListener(String topic) {
      DefaultMessageListenerContainer container = new DefaultMessageListenerContainer();
      container.setConnectionFactory(connectionFactory());
      container.setDestinationName(topic);
      container.setMessageListener(new MyListenerClass());
      return container;
    

    }

第二种选择对我来说似乎更优雅,但是我不确定听众的生命周期。我浏览了一些有关Spring Boot的jms和activemq模块的资源,并注意到DefaultMessageListenerContainer具有方法initialize()start(),尽管我不确定如何/是否需要使用它们,这是我唯一的方法找到以这种方式构建的MessageListenerContainer作为Bean声明。另外,在退订某个主题时,因此想要销毁与之关联的侦听器容器,除了调用stop(callback)方法之外,还需要做更多的事情吗?

我对JMS / ActiveMQ及其Spring集成的理解是否正确,因为没有更简单的方法可以实现这一目标?我的方法正确吗?

java spring spring-boot activemq spring-jms
2个回答
2
投票

只要您,恕我直言

  • 从spring获取连接工厂(not一个PooledConnectionFactory一个]
  • 订阅时正确呼叫initialise()start(),并取消订阅stop()
  • 不希望在异常情况下重新传递消息

使用第二种方法,一切都应该很好


0
投票

要在运行时注册新的JmsListenerEndpoint,您必须完成2个步骤

1创建自定义MessageListener服务

@Service
public class CustomMessageListener implements MessageListener {
    public void onMessage(Message message) {
        TextMessage textMessage = (TextMessage) message;
        try {
            System.out.println("[Custom message listener] " + textMessage.getText());
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

2使用JmsListenerEndpointRegistry注册新端点

@Service
public class MessageListenersService {
    @Autowired
    private JmsListenerEndpointRegistry registry;

    @Autowired
    @Qualifier("containerFactory")
    private DefaultJmsListenerContainerFactory factory;

    public void registerEndpoint(String queueNameToListen, MessageListener listener) {
        SimpleJmsListenerEndpoint endpoint = new SimpleJmsListenerEndpoint();
        endpoint.setId("ep-42"); // ID is mandatory
        endpoint.setMessageListener(listener);
        endpoint.setDestination(queueNameToListen);
        registry.registerListenerContainer(endpoint, factory, true);
    }
}

使用它

    private static final String CUSTOM_DESTINATION = "queue-42";

    @Autowired
    MessageListenersService messageListenersService;
    @Autowired
    CustomMessageListener customMessageListener;
    @Autowired
    JmsTemplate jmsTemplate;

    @PostConstruct
    public void createCustomListener() throws InterruptedException {
        messageListenersService.registerEndpoint(CUSTOM_DESTINATION, customMessageListener);
        jmsTemplate.send(CUSTOM_DESTINATION, session -> session.createTextMessage("hello world"));
    }
© www.soinside.com 2019 - 2024. All rights reserved.