如何配置ActiveMQ Artemis以通过Java使用AMQP 1.0和其他协议

问题描述 投票:0回答:1

我有一个运行良好的 ActiveMQ Artemis 服务器。我现在想在服务器上为使用 pub-sub 的节点应用程序启用 AMQP。虽然我的节点发布订阅服务器能够连接到它,但即使连接有效,它们也不会发布或接收。为什么我的 pub 不发送,为什么我的 sub 不接收?

我正在关注 Github 上 AMQP Rhea 的 publishersubscriber 示例。它们都可以连接到localhost:5672

下面是我的 ActiveMQ Artemis JMS 服务器实现。请注意,我使用

addAcceptorConfiguration

 添加了 2 个配置(一个用于 Artemis,另一个用于 AMQP 的默认端口)。

//this is my jmsserver /* * This Java source file was generated by the Gradle 'init' task. */ package testSupport.artemis.server; import java.util.List; import java.util.stream.Collectors; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class JMSServer { private static int qId = 0; private ActiveMQServer server; private String errMsg = ""; /** * Factory method to create an instance of a JMS Server * @param topics list of topics to add to the server * @return */ public static JMSServer createJMSServer(List<String> topics) { JMSServer s = new JMSServer(); s.start(); if (topics != null) { s.setTopics(topics); } return s; } /** * Factory method to create an instance of a JMS Server * @return */ public static JMSServer createJMSServer() { return createJMSServer(null); } /** * Updates the server config with settings required to connect invm or from * another process on localhost * * @param config */ public static void updateConfig(Configuration config) { try { config.setPersistenceEnabled(false) .setSecurityEnabled(false) .addAcceptorConfiguration("tcp", "tcp://localhost:61616") .addAcceptorConfiguration("amqp", "tcp://localhost:5672"); // SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") && Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) { config.addAcceptorConfiguration("invm", "vm://0"); } } catch (Exception ex) { ex.printStackTrace(); } } /** * Default Constructor */ public JMSServer() { try { Configuration config = new ConfigurationImpl(); updateConfig(config); server = ActiveMQServers.newActiveMQServer(config); } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); ex.printStackTrace(); } } /** * Start the JMS Server * @return */ public boolean start() { boolean success = false; try { server.start(); for (int i = 0; i < 50; i++) { Thread.sleep(100); if (server.isActive()) { success = true; break; } } } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); ex.printStackTrace(); } return success; } /** * Stop the JMS Server */ public void stop() { try { server.stop(); } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); ex.printStackTrace(); } } /** * Get the Error Message * @return */ public String getErrMsg() { return errMsg; } /** * Set a list of topics to add to this server * @param topics * @return */ public boolean setTopics(List<String> topics) { boolean success = true; if (!server.isActive()) { errMsg = "Topics cannot be set until the server has been started."; return false; } // add the topics for (String t : topics) { try { SimpleString addr = SimpleString.toSimpleString(t); QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false) .autoDelete(false) .durable(true) .build(); server.getQueueFactory().createQueueWith(qcfg); server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST)); qId++; } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); success = false; } } return success; } }

更新: 所以评论建议我必须将 artemis-amqp-protocol

 添加到类路径中。在 gradle 中,我确保包含 
artemis-amqp-protocol
artemis-jms-server
 以及匹配的版本,如下所示:

dependencies { implementation 'org.apache.activemq:artemis-jms-server:2.22.0' implementation 'org.apache.activemq:artemis-amqp-protocol:2.22.0' // other dependencies here... }
    
java amqp activemq-artemis
1个回答
1
投票
您必须确保

artemis-amqp-protocol

 jar 位于您的类路径中,否则代理将无法支持 AMQP。如果是,那么您应该会看到一条日志消息:

AMQ221043:找到协议模块:[artemis-amqp-protocol]。添加对以下协议的支持:AMQP。

然后你会看到类似的东西:

AMQ221020:在 localhost:5672 为协议 [AMQP] 启动 EPOLL 接受器。

如果您没有看到这些(或非常接近这些的内容),那么代理将不支持 AMQP。

© www.soinside.com 2019 - 2024. All rights reserved.