我的最终目标是开发一个应用程序,在启动时检索在某个主题上发布的最后一条消息;而且,无论该消息是在一秒钟前发布的,还是一个月前发布的,甚至更早......
我的感觉是我必须使用持久主题。出于测试目的,我尝试开发一个可以执行我想要的操作的示例:
public void myTest() throws JMSException, InterruptedException {
latch = new CountDownLatch(1);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://150.151.179.31:61616");
Connection connection1 = factory.createConnection();
Session session1 = connection1.createSession(false /* no transaction */, Session.CLIENT_ACKNOWLEDGE);
connection1.start();
Topic topic1 = session1.createTopic("test.persistence");
MessageConsumer consumer1 = session1.createConsumer(topic1, null /* no filter */, true /* prevent receiving message published by current session */);
consumer1.setMessageListener(this);
//////////////////////////////////////////////////
// if I do not close the session, my sample works,
// if I close it, it doesn't
//
// session1.close();
// connection1.close();
//////////////////////////////////////////////////
Connection connection2 = factory.createConnection();
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection2.start();
Topic topic2 = session2.createTopic("test.persistence");
MessageProducer producer2 = session2.createProducer(topic2);
MapMessage mapMessage2 = session2.createMapMessage();
mapMessage2.setStringProperty("key", "value");
producer2.send(mapMessage2, PERSISTENT, DEFAULT_PRIORITY, DEFAULT_TIME_TO_LIVE);
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
}
@Override
public void onMessage(Message message) {
latch.countDown();
}
这个示例使用ActiveMQ,但我在EMS上也有同样的问题。
我创建了两个会话:
PERSISTENT
主题行为是:
请帮助我理解我的问题。
您所看到的是预期的行为。 JMS 主题语义规定,在没有任何订阅的情况下发送消息时将被丢弃。这正是您关闭订阅者然后发送消息时发生的情况。
如果您希望在订阅者关闭后保留订阅,那么您需要使用持久订阅。
但是,您应该记住,在创建持久订阅之前发送的任何消息仍将被丢弃。您必须使用 ActiveMQ“Classic”中的“retroactive Consumer”功能来获取在创建订阅之前发送的任何消息。当然,除了 ActiveMQ 之外,这不适用于任何其他 JMS 提供程序。
最后,JMS 目标(即队列和主题)本身不是持久的或非持久的。这就是 JMS 消息的品质。