我有一个运行良好的 ActiveMQ Artemis 服务器。我现在想在服务器上为使用 pub-sub 的节点应用程序启用 AMQP。虽然我的节点发布订阅服务器能够连接到它,但即使连接有效,它们也不会发布或接收。为什么我的 pub 不发送,为什么我的 sub 不接收?
我正在关注 Github 上 AMQP Rhea 的 publisher 和 subscriber 示例。它们都可以连接到localhost:5672
。下面是我的 ActiveMQ Artemis JMS 服务器实现。请注意,我使用
addAcceptorConfiguration
添加了 2 个配置(一个用于 Artemis,另一个用于 AMQP 的默认端口)。
//this is my jmsserver
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package testSupport.artemis.server;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class JMSServer {
private static int qId = 0;
private ActiveMQServer server;
private String errMsg = "";
/**
* Factory method to create an instance of a JMS Server
* @param topics list of topics to add to the server
* @return
*/
public static JMSServer createJMSServer(List<String> topics) {
JMSServer s = new JMSServer();
s.start();
if (topics != null) {
s.setTopics(topics);
}
return s;
}
/**
* Factory method to create an instance of a JMS Server
* @return
*/
public static JMSServer createJMSServer() {
return createJMSServer(null);
}
/**
* Updates the server config with settings required to connect invm or from
* another process on localhost
*
* @param config
*/
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672");
// SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
config.addAcceptorConfiguration("invm", "vm://0");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Default Constructor
*/
public JMSServer() {
try {
Configuration config = new ConfigurationImpl();
updateConfig(config);
server = ActiveMQServers.newActiveMQServer(config);
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Start the JMS Server
* @return
*/
public boolean start() {
boolean success = false;
try {
server.start();
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (server.isActive()) {
success = true;
break;
}
}
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
return success;
}
/**
* Stop the JMS Server
*/
public void stop() {
try {
server.stop();
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Get the Error Message
* @return
*/
public String getErrMsg() {
return errMsg;
}
/**
* Set a list of topics to add to this server
* @param topics
* @return
*/
public boolean setTopics(List<String> topics) {
boolean success = true;
if (!server.isActive()) {
errMsg = "Topics cannot be set until the server has been started.";
return false;
}
// add the topics
for (String t : topics) {
try {
SimpleString addr = SimpleString.toSimpleString(t);
QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
.autoDelete(false)
.durable(true)
.build();
server.getQueueFactory().createQueueWith(qcfg);
server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
qId++;
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
success = false;
}
}
return success;
}
}
更新:
所以评论建议我必须将 artemis-amqp-protocol
添加到类路径中。在 gradle 中,我确保包含
artemis-amqp-protocol
和
artemis-jms-server
以及匹配的版本,如下所示:
dependencies {
implementation 'org.apache.activemq:artemis-jms-server:2.22.0'
implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0'
// other dependencies here...
}
artemis-amqp-protocol
jar 位于您的类路径中,否则代理将无法支持 AMQP。如果是,那么您应该会看到一条日志消息:
AMQ221043:找到协议模块:[artemis-amqp-protocol]。添加对以下协议的支持:AMQP。然后你会看到类似的东西:
AMQ221020:在 localhost:5672 为协议 [AMQP] 启动 EPOLL 接受器。如果您没有看到这些(或非常接近这些的内容),那么代理将不支持 AMQP。