我面临一个奇怪的问题。我有 2 个消费者,它们在两个单独的线程上运行。生产者在 Topic 中发布消息时,有时两个消费者之一会因为没有消息发送到该消费者而被卡住。但值得注意的是,这种情况并不是每次都会发生。有时我可以看到消费者都收到了消息并且流程按预期结束。以下是我的消费者(Tommy、Harry)和生产者(WeatherChannel)。
Tommy.java
public class Tommy implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Tommy.class);
public void receiveMessage() throws NamingException {
// Create a new initial context, which loads from jndi.properties file
Context context = new InitialContext();
// Lookup an existing Destination which is a topic in our example
Topic topic = (Topic)context.lookup("jms/test/topic");
//Object in a try-with-resources block the close method will be called automatically at the end of the block.
try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
JMSContext jmsContext = connectionFactory.createContext()) {
//Create consumer and receive String message on the fly (i.e. without need to type caste to Message etc.)
String messageReceived = jmsContext.createConsumer(topic).receiveBody(String.class);
logger.info("Message received by Tommy >>> {}", messageReceived);
}
}
// public static void main(String[] args) throws NamingException, InterruptedException {
// receiveMessage();
// }
@Override
public void run() {
try {
receiveMessage();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
Harry.java
public class Harry implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(Harry.class);
public static void receiveMessage() throws NamingException {
// Create a new initial context, which loads from jndi.properties file
Context context = new InitialContext();
// Lookup an existing Destination which is a topic in our example
Topic topic = (Topic)context.lookup("jms/test/topic");
//Object in a try-with-resources block the close method will be called automatically at the end of the block.
try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
JMSContext jmsContext = connectionFactory.createContext()) {
//Create consumer and receive String message on the fly (i.e. without need to type caste to Message etc.)
String messageReceived = jmsContext.createConsumer(topic).receiveBody(String.class);
logger.info("Message received by Harry >>> {}", messageReceived);
}
}
// public static void main(String[] args) throws NamingException, InterruptedException {
// receiveMessage();
// }
@Override
public void run() {
try {
receiveMessage();
} catch (NamingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
WeatherChannel.java
public class WeatherChannel {
private static final Logger logger = LoggerFactory.getLogger(WeatherChannel.class);
public void broadcastMessage() throws NamingException, JMSException, InterruptedException {
// Create a new initial context, which loads from jndi.properties file
Context context = new InitialContext();
// Lookup an existing Destination which is a topic in our example
Topic topic = (Topic)context.lookup("jms/test/topic");
//Object in a try-with-resources block the close method will be called automatically at the end of the block.
try(ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
JMSContext jmsContext = connectionFactory.createContext()) {
//Create producer and send message on the fly
jmsContext.createProducer().send(topic, "Today's weather at Kolkata is pleasant with max temp 27 C");
logger.info("Message sent successfully by producer");
}
}
}
这就是我从主方法调用消费者和生产者的方式。
new Thread(new Tommy()).start();
new Thread(new Harry()).start();
new WeatherChannel().broadcastMessage();
最后,我从各自的主方法中触发了两个消费者并摆脱了线程。这次我发现我没有看到问题,并且两个消费者都成功接收了消息。有人可以指出我哪里出错了吗?
阿尔忒弥斯2.32.0
这很可能是由于生产者线程发送消息时消费者线程尚未完全启动和运行的结果。 JMS 主题不会为发送消息时不在线的消费者保留消息,除非存在现有的持久主题订阅。
主题是一种广播机制,而不是队列,您需要确保您想要接收给定消息的任何消费者在发送之前都在线。在您的简单示例中,这可以通过在创建对象时将锁存器或其他可等待资源传递给对象并在发送之前等待该锁存器来完成。线程可以在调用接收之前但在创建消费者之后执行闩锁倒计时。