从本文档:
消息组中的消息共享相同的组ID,即它们具有相同的组标识符属性(JMS的JMSXGroupID,Apache ActiveMQ Artemis Core API的_AMQ_GROUP_ID)。
[当我在代理中浏览值为product = paper的消息时,我原来通过JMSXGroupID
设置的属性为何变为_AMQ_GROUP_ID
。但是,在我的@JmsListener
带注释的方法中,我可以看到_AMQ_GROUP_ID
属性丢失,并且JMSXGroupID
在消息的headers
哈希图中作为空值通过。
@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)
所以
JMSXGroupID
设置为'product = paper'后,将消息发送到队列中>]_AMQ_GROUP_ID
的值为'product = paper'_AMQ_GROUP_ID
不存在,JMSXGroupID
的值为null而不是'product = paper'。字符'='是否无效或是否有其他原因可能导致此错误?我没办法尝试了。
使用新代码编辑:
HeaderMapper:
@Component public class GroupIdMessageMapper extends SimpleJmsHeaderMapper { @Override public MessageHeaders toHeaders(Message jmsMessage) { MessageHeaders messageHeaders = super.toHeaders(jmsMessage); Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders); try { messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID")); } catch (JMSException e) { e.printStackTrace(); } // can see while debugging that this returns the correct headers return new MessageHeaders(messageHeadersMap); } }
侦听器:
@Component public class CustomSpringJmsListener { protected final Logger LOG = LoggerFactory.getLogger(getClass()); @JmsListener(destination = "local-queue", subscription = "groupid-example", containerFactory = "containerFactory", concurrency = "15-15") public void receive(Message<MyObject> message) throws JMSException { LOG.info("Received message: " + message); } }
应用代码:
@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {
private static Logger LOG = LoggerFactory
.getLogger(GroupidApplication.class);
@Autowired
private JmsTemplate jmsTemplate;
@Autowired MessageConverter messageConverter;
public static void main(String[] args) {
LOG.info("STARTING THE APPLICATION");
SpringApplication.run(GroupidApplication.class, args);
LOG.info("APPLICATION FINISHED");
}
@Override
public void run(String... args) {
LOG.info("EXECUTING : command line runner");
jmsTemplate.setPubSubDomain(true);
/*createAndSendObjectMessage("Message1");
createAndSendTextMessage("Message2");
createAndSendTextMessage("Message3");
createAndSendTextMessage("Message4");
createAndSendTextMessage("Message5");
createAndSendTextMessage("Message6");*/
}
private void createAndSendTextMessage(String messageBody) {
jmsTemplate.send("local-queue", session -> {
Message message = session.createTextMessage(messageBody);
message.setStringProperty("JMSXGroupID", "product=paper");
return message;
});
}
// BEANS
@Bean
public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
DefaultJmsListenerContainerFactoryConfigurer configurer) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setSubscriptionDurable(true);
factory.setSubscriptionShared(true);
factory.setMessageConverter(messagingMessageConverter());
return factory;
}
@Bean
public MessagingMessageConverter messagingMessageConverter() {
return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
}
}
从此文档:消息组中的消息共享相同的组ID,即它们具有相同的组标识符属性(JMS的JMSXGroupID,Apache ActiveMQ Artemis Core API的_AMQ_GROUP_ID)。...