我有一个使用 ActiveMQ Classic 6.0.1 的简单生产者和消费者应用程序。
这是制作人:
ConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
// Create a Connection
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Create a Session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create the Topic
Destination destination = session.createQueue(TOPIC_NAME);
// Create a MessageProducer
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
// Create a TextMessage and publish it
TextMessage message = session.createTextMessage("Hello, JMS Topic!");
producer.send(message);
System.out.println("Message sent");
// Clean up
session.close();
connection.close();
消息确实发送到队列了
现在,我有两种方式来消费消息,第一种:
public class MyListener implements MessageListener{
private static final Logger logger = Logger.getLogger(WebServer.class.getName());
private static final String BROKER_URL = "tcp://messaging:61616";
private static final String TOPIC_NAME = "TEST.FOO";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
// Create a connection
Connection connection = connectionFactory.createConnection("admin", "admin");
connection.start();
// Create a session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Create a destination (topic or queue)
Destination destination = session.createQueue(TOPIC_NAME);
// Create a consumer
MessageConsumer consumer = session.createConsumer(destination);
consumer.setMessageListener(new MyListener());
// INFINIT LOOP
}
@Override
public void onMessage(Message message) {
System.out.println(message);
}
}
这样消息确实会打印到控制台。
第二种方法不起作用(使用 JMSContext):
public class MyListener implements MessageListener{
private static final Logger logger = Logger.getLogger(WebServer.class.getName());
private static final String BROKER_URL = "tcp://messaging:61616";
private static final String TOPIC_NAME = "TEST.FOO";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
server.setHandler(context);
server.start();
try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
consumer.setMessageListener(new MyListener());
context.start();
}
// INFINIT LOOP
}
@Override
public void onMessage(Message message) {
System.out.println("lol");
}
}
这两个应用程序类似,在嵌入式 Jetty 上运行并使用 JPA (MariaDB/EclispeLink)、Jersey JAX-RS 和 Weld-SE 作为 CDI 实现。
知道为什么第二种方式(我认为是 JMS 规范 2.0)不起作用吗?
我认为ActiveMQ classic 6.0.1支持JMS 2.0。
您依赖 ActiveMQ Classic 的 JMS 客户端实现来防止您的应用程序在调用
setMessageListener
后简单退出。这适用于 JMS 1.1 API 实现,但显然不适用于 JMS 2.0 实现。需要明确的是,JMS 规范“不”保证此行为,因此您不应依赖它。相反,您应该编写应用程序,以便在调用 setMessageListener
后它不会退出,例如:public class MyListener implements MessageListener{
private static final Logger logger = Logger.getLogger(WebServer.class.getName());
private static final String BROKER_URL = "tcp://messaging:61616";
private static final String TOPIC_NAME = "TEST.FOO";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
server.setHandler(context);
server.start();
try(JMSContext context = connectionFactory.createContext("admin", "admin", AUTO_ACKNOWLEDGE)) {
JMSConsumer consumer = context.createConsumer(context.createQueue(TOPIC_NAME));
consumer.setMessageListener(new MyListener());
context.start();
}
Thread.sleep(30000); // wait for 5 minutes
}
@Override
public void onMessage(Message message) {
System.out.println("lol");
}
}
最后,值得注意的是ActiveMQ Classic 尚未完全实现 JMS 2
。如果您想要完整的实现,您必须使用 ActiveMQ Artemis。