ActiveMQ 消息在通过 java 发送时未收到,但在使用控制台时有效

问题描述 投票:0回答:2

我有一个服务设置为通过 ActiveMQ 发送和接收消息,我计划稍后实现其他消息服务,例如 KAFKA。

当我调用下面的发送和接收函数时,接收函数停止,我不得不手动关闭程序。但是,当我使用 AMQ 网络控制台发送消息时,接收功能有效。

import org.apache.activemq.ActiveMQConnectionFactory;
import arc.ipc.IService;

public class ActiveMQService<K, V> implements IService<K, V> {
    private String brokerAddress;
    private ConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;

    public ActiveMQService(String brokerAddress) {
        this.brokerAddress = brokerAddress;
    }

    @Override
    public void send(String topic,V value) throws JMSException {
        Destination destination = session.createTopic(topic);
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage message = session.createTextMessage(value.toString());
        producer.send(message);
    }

    @Override
    public String receive(String topic) throws JMSException {
        Destination destination = session.createTopic(topic);
        MessageConsumer consumer = session.createConsumer(destination);
        Message message = consumer.receive();
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            return textMessage.getText();
        }
        return null;
    }

    @Override
    public void connect() throws JMSException {
        connectionFactory = new ActiveMQConnectionFactory(brokerAddress);
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    }

    @Override
    public void disconnect() throws JMSException {
        if (session != null) {
            session.close();
        }
        if (connection != null) {
            connection.close();
        }
    }
}
java jms activemq
2个回答
0
投票

您正在将 JMS 会话创建为 transacted,即:

session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

但是,您在发送消息后永远不会打电话给

commit()


0
投票

我认为问题在于您正在将消息发送到 JMS topicthen 在该主题上创建订阅者以接收您刚刚发送的消息。 JMS 主题遵循正常的发布/订阅语义,这意味着订阅必须存在before发送消息。

此外,当消费者没有收到消息时,它只是永远阻塞,即:

Message message = consumer.receive();

您可能会考虑使用 at timeout 并记录错误或抛出异常,例如:

Message message = consumer.receive(500);
if (message == null) {
   throw new IllegalStateException("Did not receive message!");
}
© www.soinside.com 2019 - 2024. All rights reserved.