我有一个 Artemis 服务器,运行良好。我现在想在服务器上为使用 pub-sub 的节点应用程序启用 AMQP。虽然我的节点发布订阅服务器能够连接到它,但即使连接有效,它们也不会发布或接收。为什么我的 pub 不发送/为什么我的 sub 不接收?
我正在关注 Github 上 AMQP Rhea 的 publisher 和 5 个 subscriber 示例。它们都可以连接到 localhost:5672。
下面是我的 Artemis JMS 服务器实现。请注意,我使用 addAcceptorConfiguration 添加了 2 个配置(一个用于 Artemis,另一个用于 AMQP 的默认端口)。
//this is my jmsserver
/*
* This Java source file was generated by the Gradle 'init' task.
*/
package testSupport.artemis.server;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.QueueConfig;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
public class JMSServer {
private static int qId = 0;
private ActiveMQServer server;
private String errMsg = "";
/**
* Factory method to create an instance of a JMS Server
* @param topics list of topics to add to the server
* @return
*/
public static JMSServer createJMSServer(List<String> topics) {
JMSServer s = new JMSServer();
s.start();
if (topics != null) {
s.setTopics(topics);
}
return s;
}
/**
* Factory method to create an instance of a JMS Server
* @return
*/
public static JMSServer createJMSServer() {
return createJMSServer(null);
}
/**
* Updates the server config with settings required to connect invm or from
* another process on localhost
*
* @param config
*/
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672");
// SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist
if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") &&
Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) {
config.addAcceptorConfiguration("invm", "vm://0");
}
} catch (Exception ex) {
ex.printStackTrace();
}
}
/**
* Default Constructor
*/
public JMSServer() {
try {
Configuration config = new ConfigurationImpl();
updateConfig(config);
server = ActiveMQServers.newActiveMQServer(config);
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Start the JMS Server
* @return
*/
public boolean start() {
boolean success = false;
try {
server.start();
for (int i = 0; i < 50; i++) {
Thread.sleep(100);
if (server.isActive()) {
success = true;
break;
}
}
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
return success;
}
/**
* Stop the JMS Server
*/
public void stop() {
try {
server.stop();
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
ex.printStackTrace();
}
}
/**
* Get the Error Message
* @return
*/
public String getErrMsg() {
return errMsg;
}
/**
* Set a list of topics to add to this server
* @param topics
* @return
*/
public boolean setTopics(List<String> topics) {
boolean success = true;
if (!server.isActive()) {
errMsg = "Topics cannot be set until the server has been started.";
return false;
}
// add the topics
for (String t : topics) {
try {
SimpleString addr = SimpleString.toSimpleString(t);
QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false)
.autoDelete(false)
.durable(true)
.build();
server.getQueueFactory().createQueueWith(qcfg);
server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST));
qId++;
} catch (Exception ex) {
errMsg = ex + ": " + ex.getMessage();
success = false;
}
}
return success;
}
}
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672?protocols=CORE,AMQP");