我正在尝试使用嵌入式HornetQ服务器创建两个Spring Boot应用程序的静态集群。一个应用程序/服务器将处理外部事件并生成要发送到消息队列的消息。另一个应用程序/服务器将在消息队列上侦听并处理传入的消息。由于两个应用程序之间的链接不可靠,因此每个应用程序将仅使用本地/ inVM客户端在各自的服务器上生成/使用消息,并依靠集群功能将消息转发到集群中另一台服务器上的队列。] >
我正在使用HornetQConfigurationCustomizer
自定义嵌入式HornetQ服务器,因为默认情况下它仅随附InVMConnectorFactory
。
[在整个示例中,我创建了几个示例应用程序来说明此设置,“ ServerSend”是指将生成消息的服务器,“ ServerReceive”是将使用消息的服务器。
两个应用程序的pom.xml都包含:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-hornetq</artifactId> </dependency> <dependency> <groupId>org.hornetq</groupId> <artifactId>hornetq-jms-server</artifactId> </dependency>
DemoHornetqServerSendApplication:
@SpringBootApplication @EnableScheduling public class DemoHornetqServerSendApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${spring.hornetq.embedded.queues}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqServerSendApplication.class, args); } @Scheduled(fixedRate = 5000) private void sendMessage() { String message = "Timestamp from Server: " + System.currentTimeMillis(); System.out.println("Sending message: " + message); jmsTemplate.convertAndSend(testQueue, message); } @Bean public HornetQConfigurationCustomizer hornetCustomizer() { return new HornetQConfigurationCustomizer() { @Override public void customize(Configuration configuration) { String serverSendConnectorName = "server-send-connector"; String serverReceiveConnectorName = "server-receive-connector"; Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5445"); TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverSendConnectorName, tc); Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); acceptors.add(tc); params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5446"); tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverReceiveConnectorName, tc); List<String> staticConnectors = new ArrayList<String>(); staticConnectors.add(serverReceiveConnectorName); ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration( "my-cluster", // name "jms", // address serverSendConnectorName, // connector name 500, // retry interval true, // duplicate detection true, // forward when no consumers 1, // max hops 1000000, // confirmation window size staticConnectors, true // allow direct connections only ); configuration.getClusterConfigurations().add(conf); AddressSettings setting = new AddressSettings(); setting.setRedistributionDelay(0); configuration.getAddressesSettings().put("#", setting); } }; } }
application.properties(ServerSend):
spring.hornetq.mode=embedded spring.hornetq.embedded.enabled=true spring.hornetq.embedded.queues=jms.testqueue spring.hornetq.embedded.cluster-password=password
DemoHornetqServerReceiveApplication:
@SpringBootApplication @EnableJms public class DemoHornetqServerReceiveApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${spring.hornetq.embedded.queues}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqServerReceiveApplication.class, args); } @JmsListener(destination="${spring.hornetq.embedded.queues}") public void receiveMessage(String message) { System.out.println("Received message: " + message); } @Bean public HornetQConfigurationCustomizer hornetCustomizer() { return new HornetQConfigurationCustomizer() { @Override public void customize(Configuration configuration) { String serverSendConnectorName = "server-send-connector"; String serverReceiveConnectorName = "server-receive-connector"; Map<String, TransportConfiguration> connectorConf = configuration.getConnectorConfigurations(); Map<String, Object> params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5446"); TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverReceiveConnectorName, tc); Set<TransportConfiguration> acceptors = configuration.getAcceptorConfigurations(); tc = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params); acceptors.add(tc); params = new HashMap<String, Object>(); params.put(TransportConstants.HOST_PROP_NAME, "localhost"); params.put(TransportConstants.PORT_PROP_NAME, "5445"); tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params); connectorConf.put(serverSendConnectorName, tc); List<String> staticConnectors = new ArrayList<String>(); staticConnectors.add(serverSendConnectorName); ClusterConnectionConfiguration conf = new ClusterConnectionConfiguration( "my-cluster", // name "jms", // address serverReceiveConnectorName, // connector name 500, // retry interval true, // duplicate detection true, // forward when no consumers 1, // max hops 1000000, // confirmation window size staticConnectors, true // allow direct connections only ); configuration.getClusterConfigurations().add(conf); AddressSettings setting = new AddressSettings(); setting.setRedistributionDelay(0); configuration.getAddressesSettings().put("#", setting); } }; } }
application.properties(ServerReceive):
spring.hornetq.mode=embedded spring.hornetq.embedded.enabled=true spring.hornetq.embedded.queues=jms.testqueue spring.hornetq.embedded.cluster-password=password
启动两个应用程序后,日志输出显示以下内容:
ServerSend:
2015-04-09 11:11:58.471 INFO 7536 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging) 2015-04-09 11:11:58.501 INFO 7536 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO 2015-04-09 11:11:58.595 INFO 7536 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE 2015-04-09 11:11:58.720 INFO 7536 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue 2015-04-09 11:11:59.568 INFO 7536 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5445 2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221007: Server is now live 2015-04-09 11:11:59.593 INFO 7536 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
ServerReceive:
2015-04-09 11:12:04.401 INFO 4528 --- [ main] org.hornetq.core.server : HQ221000: live server is starting with configuration HornetQ Configuration (clustered=true,backup=false,sharedStore=true,journalDirectory=C:\Users\****\AppData\Local\Temp\hornetq-data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/largemessages,pagingDirectory=data/paging) 2015-04-09 11:12:04.410 INFO 4528 --- [ main] org.hornetq.core.server : HQ221045: libaio is not available, switching the configuration into NIO 2015-04-09 11:12:04.520 INFO 4528 --- [ main] org.hornetq.core.server : HQ221043: Adding protocol support CORE 2015-04-09 11:12:04.629 INFO 4528 --- [ main] org.hornetq.core.server : HQ221003: trying to deploy queue jms.queue.jms.testqueue 2015-04-09 11:12:05.545 INFO 4528 --- [ main] org.hornetq.core.server : HQ221020: Started Netty Acceptor version 4.0.13.Final localhost:5446 2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221007: Server is now live 2015-04-09 11:12:05.578 INFO 4528 --- [ main] org.hornetq.core.server : HQ221001: HornetQ Server version 2.4.5.FINAL (Wild Hornet, 124) [c139929d-d90f-11e4-ba2e-e58abf5d6944]
我在两个输出中都看到了
clustered=true
,如果我从false
中删除了集群配置,这将显示HornetQConfigurationCustomizer
,因此它必须具有一定的作用。
现在,ServerSend在控制台输出中显示此:
Sending message: Timestamp from Server: 1428574324910 Sending message: Timestamp from Server: 1428574329899 Sending message: Timestamp from Server: 1428574334904
但是,ServerReceive什么也没显示。
似乎邮件没有从ServerSend转发到ServerReceive。
我通过创建另外两个Spring Boot应用程序(ClientSend和ClientReceive)进行了更多测试,这些应用程序not
嵌入了HornetQ服务器,而是连接到“本地”服务器。两个客户端应用程序的pom.xml包含:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-hornetq</artifactId> </dependency>
DemoHornetqClientSendApplication:
@SpringBootApplication @EnableScheduling public class DemoHornetqClientSendApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${queue}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqClientSendApplication.class, args); } @Scheduled(fixedRate = 5000) private void sendMessage() { String message = "Timestamp from Client: " + System.currentTimeMillis(); System.out.println("Sending message: " + message); jmsTemplate.convertAndSend(testQueue, message); } }
application.properties(ClientSend):
spring.hornetq.mode=native spring.hornetq.host=localhost spring.hornetq.port=5446 queue=jms.testqueue
DemoHornetqClientReceiveApplication:
@SpringBootApplication @EnableJms public class DemoHornetqClientReceiveApplication { @Autowired private JmsTemplate jmsTemplate; private @Value("${queue}") String testQueue; public static void main(String[] args) { SpringApplication.run(DemoHornetqClientReceiveApplication.class, args); } @JmsListener(destination="${queue}") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
application.properties(ClientReceive):
spring.hornetq.mode=native spring.hornetq.host=localhost spring.hornetq.port=5445 queue=jms.testqueue
现在控制台显示此:
ServerReveive:
Received message: Timestamp from Client: 1428574966630 Received message: Timestamp from Client: 1428574971600 Received message: Timestamp from Client: 1428574976595
ClientReceive:
Received message: Timestamp from Server: 1428574969436 Received message: Timestamp from Server: 1428574974438 Received message: Timestamp from Server: 1428574979446
[如果我让
ServerSend
运行了一段时间,然后启动ClientReceive
,它还会接收到该点排队的所有消息,因此,这表明消息不仅会消失在某个地方,或者会从某个地方被消耗掉其他。
为了完整起见,我也将ClientSend
指向ServerSend
,将ClientReceive
指向ServerReceive
,以查看集群和InVM客户端是否存在问题,但同样没有任何提示指出任何消息已在ClientReceive
或ServerReceive
中收到。
因此看来,往/自每个嵌入式代理到直接连接的外部客户端的消息传递正常,但是在群集中的代理之间没有转发消息。
所以,毕竟这是一个大问题,在群集内不转发消息的设置有什么问题?
我正在尝试使用嵌入式HornetQ服务器创建两个Spring Boot应用程序的静态集群。一个应用程序/服务器将处理外部事件并生成消息以发送到...