如何配置 Artemis ActiveMQ 以使用 AMQP 1.0 和其他 Java 协议

问题描述 投票:0回答:1

我有一个 Artemis 服务器,运行良好。我现在想在服务器上为使用 pub-sub 的节点应用程序启用 AMQP。虽然我的节点发布订阅服务器能够连接到它,但即使连接有效,它们也不会发布或接收。为什么我的 pub 不发送/为什么我的 sub 不接收?

我正在关注 Github 上 AMQP Rhea 的 publisher 和 5 个 subscriber 示例。它们都可以连接到 localhost:5672。

下面是我的 Artemis JMS 服务器实现。请注意,我使用 addAcceptorConfiguration 添加了 2 个配置(一个用于 Artemis,另一个用于 AMQP 的默认端口)。

//this is my jmsserver /* * This Java source file was generated by the Gradle 'init' task. */ package testSupport.artemis.server; import java.util.List; import java.util.stream.Collectors; import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServers; import org.apache.activemq.artemis.core.server.QueueConfig; import org.apache.activemq.artemis.core.server.impl.AddressInfo; public class JMSServer { private static int qId = 0; private ActiveMQServer server; private String errMsg = ""; /** * Factory method to create an instance of a JMS Server * @param topics list of topics to add to the server * @return */ public static JMSServer createJMSServer(List<String> topics) { JMSServer s = new JMSServer(); s.start(); if (topics != null) { s.setTopics(topics); } return s; } /** * Factory method to create an instance of a JMS Server * @return */ public static JMSServer createJMSServer() { return createJMSServer(null); } /** * Updates the server config with settings required to connect invm or from * another process on localhost * * @param config */ public static void updateConfig(Configuration config) { try { config.setPersistenceEnabled(false) .setSecurityEnabled(false) .addAcceptorConfiguration("tcp", "tcp://localhost:61616") .addAcceptorConfiguration("amqp", "tcp://localhost:5672"); // SpringBoot may have already created an invm connector at 0, so only add one if it doesnt exist if (config.getAcceptorConfigurations().stream().noneMatch(poo -> poo.getParams().containsKey("serverId") && Integer.valueOf(poo.getParams().get("serverId").toString()) == 0)) { config.addAcceptorConfiguration("invm", "vm://0"); } } catch (Exception ex) { ex.printStackTrace(); } } /** * Default Constructor */ public JMSServer() { try { Configuration config = new ConfigurationImpl(); updateConfig(config); server = ActiveMQServers.newActiveMQServer(config); } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); ex.printStackTrace(); } } /** * Start the JMS Server * @return */ public boolean start() { boolean success = false; try { server.start(); for (int i = 0; i < 50; i++) { Thread.sleep(100); if (server.isActive()) { success = true; break; } } } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); ex.printStackTrace(); } return success; } /** * Stop the JMS Server */ public void stop() { try { server.stop(); } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); ex.printStackTrace(); } } /** * Get the Error Message * @return */ public String getErrMsg() { return errMsg; } /** * Set a list of topics to add to this server * @param topics * @return */ public boolean setTopics(List<String> topics) { boolean success = true; if (!server.isActive()) { errMsg = "Topics cannot be set until the server has been started."; return false; } // add the topics for (String t : topics) { try { SimpleString addr = SimpleString.toSimpleString(t); QueueConfig qcfg = QueueConfig.builderWith(qId, addr, addr).autoCreated(false) .autoDelete(false) .durable(true) .build(); server.getQueueFactory().createQueueWith(qcfg); server.addAddressInfo(new AddressInfo(addr, RoutingType.MULTICAST)); qId++; } catch (Exception ex) { errMsg = ex + ": " + ex.getMessage(); success = false; } } return success; } }
    
java amqp activemq-artemis
1个回答
0
投票
您需要为接受方配置启用的协议,因为它们不会根据您提供的名称推断其角色。您可以为每个接受器启用多个协议或根据您的用例限制它。

config.setPersistenceEnabled(false) .setSecurityEnabled(false) .addAcceptorConfiguration("tcp", "tcp://localhost:61616") .addAcceptorConfiguration("amqp", "tcp://localhost:5672?protocols=CORE,AMQP");
    
© www.soinside.com 2019 - 2024. All rights reserved.