我正在使用 Openshift 4 部署多个微服务,这些微服务使用发布-订阅消息传递模型通过 AMQ 代理连接。但是,当我将 pod 数量增加到 2 时,我遇到了一个问题,即所有 pod 都在使用相同的消息,而不仅仅是一个消息。
有人可以建议如何配置下面的 java 代码或 AMQ 代理以确保消息只被发布-订阅模型中的一个 pod 使用吗?是否有任何我应该注意的特定设置或需要对我的配置进行的更改?谢谢。
配置:
@Bean
public JmsListenerContainerFactory<DefaultMessageListenerContainer> jmsListenerContainerPublisherFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setErrorHandler(throwable -> {
log.info("An error has occurred in the transaction: " + throwable.getMessage());
log.error("Error: ", throwable);
});
configurer.configure(factory, connectionFactory);
factory.setPubSubDomain(true);
return factory;
}
听众:
@JmsListener(destination = "${queue.dummyObject}",
containerFactory = "jmsListenerContainerPublisherFactory")
public void onConsumePublishedMessage(String message) throws JsonProcessingException {
DummyObjectDTO dummyObjectDTO = mapper.readValue(message, DummyObjectDTO.class);
LOG.info(" Received onConsumePublishedMessage message : " + dummyObjectDTO);
}
制作人:
private <T> T sendFeatured(T value, String queue, boolean publish, Selector selector) {
*
*
jmsTemplate.setPubSubDomain(true);
jmsTemplate.convertAndSend(queue, objectAsJson);
*
*
return value;
}
由于您使用的是 ActiveMQ Artemis,因此您可以使用 JMS 2 中引入的共享订阅功能。在配置您的
DefaultJmsListenerContainerFactory
时使用setSubscriptionShared
:
factory.setPubSubDomain(true);
factory.setSubscriptionShared(true);
return factory;
然后在你的监听器上你可以设置订阅名称,例如:
@JmsListener(destination = "${queue.dummyObject}",
subscription = "mySubscriptionName",
containerFactory = "jmsListenerContainerPublisherFactory")
每 set 订阅者都需要使用唯一的订阅名称,以便只有其中一个订阅者收到消息,而不是所有订阅者。
您对 Pub/Sub 主题的期望与实际情况或功能不符。
如果只希望单个接收者处理一条消息,那么您应该使用 queue 和 a 命令,而不是事件。
在多个订阅者执行相同任务的情况下使用事件和 Pub/Sub(主题)时,您必须确保事件处理是幂等的。