我知道有一个类似的问题,但在SO中不一样。
我试图了解 JMS 中 MessageProducer 和 MessageConsumer 的幕后情况。使用 ActiveMQ 的实现,我编写了一个简单的 MessageProducer 示例来将消息发送到队列,并编写了一个 MessageConsumer 示例来使用队列中的消息,同时在本地运行 ActiveMQ。
需要Connection#start 方法来将消息发送到队列。具体调试点如下。 Connection#start 触发 ActiveMQSession#start 方法。当调用 Connection#start 时会触发此方法。请参阅以下调试点
org.apache.activemq.ActiveMQSession#start
;
问题是,MessageProducer上并不明确需要Connection#start,但MessageConsumer上需要。但是,对于这两个示例,我们都需要清除资源(session 和 connection)。我意识到,如果我删除生产者上的 Connection#start 方法,代码将执行,调试点不会被触发(甚至不会在引擎盖下),并且我会在队列中看到消息。但是如果我删除消费者上的 Connection#start 方法,代码将不会执行,这就是问题所在,为什么MessageProducer中不需要它并且代码执行成功但在MessageConsumer上需要?另外,为什么我们不使用 Connection#start 作为 MessageProducer,甚至我们需要关闭连接才能刷新资源。看起来代码有味道。
我看到字段 started 是一个
AtomicBoolean
。我不是并发和多线程方面的专家,所以,可能有人可以解释为什么对于 MessageProducer,Connection#start 不是强制性的;
package com.bzdgn.jms.stackoverflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSSendMessageToQueue {
private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String queueName = "test_queue";
String messageContent = "Hello StackOverflow!";
// Connection Factory from ActiveMQ Implementation
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
// Get connection from Connection Factory
Connection connection = connectionFactory.createConnection();
// Create session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Send Message to Queue
Queue queue = session.createQueue(queueName);
TextMessage msg = session.createTextMessage(messageContent);
MessageProducer messageProducer = session.createProducer(queue);
messageProducer.send(msg);
// Clear resources
session.close();
connection.close();
}
}
package com.bzdgn.jms.stackoverflow;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class JMSConsumeMessageFromQueue {
private static final String ACTIVE_MQ_URL = "tcp://localhost:61616";
public static void main(String[] args) throws JMSException {
String queueName = "test_queue";
// Connection Factory from ActiveMQ Implementation
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ACTIVE_MQ_URL);
// Get connection from Connection Factory
Connection connection = connectionFactory.createConnection();
// Create session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Consume Message from the Queue
Queue queue = session.createQueue(queueName);
MessageConsumer messageConsumer = session.createConsumer(queue);
connection.start();
Message message = messageConsumer.receive(500);
if ( message != null ) {
if ( message instanceof TextMessage ) {
TextMessage textMessage = (TextMessage) message;
String messageContent = textMessage.getText();
System.out.println("Message Content: " + messageContent);
}
} else {
System.out.println("No message in the queue: " + queueName);
}
// Clear resources
session.close();
connection.close();
}
}
JDK版本是
1.8
,我正在运行ActiveMQ 5.15.12
并且还对客户端依赖项使用相同的版本;
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-client</artifactId>
<version>5.15.12</version>
</dependency>
此处的行为由 JMS 规范规定。简而言之,
javax.jms.Connection.start()
适用于消费者而不是生产者。它告诉代理开始向与连接关联的消费者传递消息。 Connection
的 JavaDoc 是这样说的:
通常将连接保持在停止模式,直到设置完成(即,直到创建所有消息使用者)。此时,客户端调用连接的启动方法,消息开始到达连接的使用者。此设置约定最大限度地减少了客户端仍在进行自身设置的过程中可能因异步消息传递而导致的任何客户端混乱。
可以立即开始连接,然后进行设置。执行此操作的客户端必须准备好处理异步消息传递,同时它们仍在设置过程中。
start()
方法对生产者没有影响。您正在看到预期的行为。
值得注意的是,如果您使用 JMS 2 中的简化 API,此行为会有所不同。如果您使用
JMSContext
创建 JMSConsumer
,则消息传送将自动开始。需要明确的是,ActiveMQ Classic 并未完全实现 JMS 2,但 ActiveMQ Artemis 可以。