__ AMQ_GROUP_ID存在于消息中,但@JmsListener中的JMSXGroupID为空

问题描述 投票:0回答:1

从本文档:

消息组中的消息共享相同的组ID,即它们具有相同的组标识符属性(JMS的JMSXGroupID,Apache ActiveMQ Artemis Core API的_AMQ_GROUP_ID)。

[当我在代理中浏览值为product = paper的消息时,我原来通过JMSXGroupID设置的属性为何变为_AMQ_GROUP_ID。但是,在我的@JmsListener带注释的方法中,我可以看到_AMQ_GROUP_ID属性丢失,并且JMSXGroupID在消息的headers哈希图中作为空值通过。

@JmsListener(destination = "${artemis.destination}", subscription = "${artemis.subscriptionName}",
            containerFactory = "containerFactory", concurrency = "15-15")
public void consumeMessage(Message<StatefulSpineEvent<?>> eventMessage)

所以

  1. 我的Producer应用程序将字符串属性JMSXGroupID设置为'product = paper'后,将消息发送到队列中>]
  2. [当我在Artemis UI中浏览该邮件的标题时,可以看到_AMQ_GROUP_ID的值为'product = paper'
  3. [调试调试侦听器应用程序并查看标题映射时,_AMQ_GROUP_ID不存在,JMSXGroupID的值为null而不是'product = paper'。
  4. 字符'='是否无效或是否有其他原因可能导致此错误?我没办法尝试了。

使用新代码编辑:

HeaderMapper:

@Component
public class GroupIdMessageMapper extends SimpleJmsHeaderMapper {

    @Override
    public MessageHeaders toHeaders(Message jmsMessage) {

        MessageHeaders messageHeaders = super.toHeaders(jmsMessage);

        Map<String, Object> messageHeadersMap = new HashMap<>(messageHeaders);

        try {
            messageHeadersMap.put("JMSXGroupID", jmsMessage.getStringProperty("_AMQ_GROUP_ID"));
        } catch (JMSException e) {
            e.printStackTrace();
        }

        // can see while debugging that this returns the correct headers
        return new MessageHeaders(messageHeadersMap);
    }
}

侦听器:

@Component
public class CustomSpringJmsListener {

    protected final Logger LOG = LoggerFactory.getLogger(getClass());

    @JmsListener(destination = "local-queue", subscription = "groupid-example",
            containerFactory = "containerFactory", concurrency = "15-15")
    public void receive(Message<MyObject> message) throws JMSException {
        LOG.info("Received message: " + message);
    }
}

应用代码:

@SpringBootApplication
@EnableJms
public class GroupidApplication implements CommandLineRunner {

    private static Logger LOG = LoggerFactory
            .getLogger(GroupidApplication.class);

    @Autowired
    private JmsTemplate jmsTemplate;

    @Autowired MessageConverter messageConverter;

    public static void main(String[] args) {
        LOG.info("STARTING THE APPLICATION");
        SpringApplication.run(GroupidApplication.class, args);

        LOG.info("APPLICATION FINISHED");
    }

    @Override
    public void run(String... args) {
        LOG.info("EXECUTING : command line runner");

        jmsTemplate.setPubSubDomain(true);

        /*createAndSendObjectMessage("Message1");
        createAndSendTextMessage("Message2");
        createAndSendTextMessage("Message3");
        createAndSendTextMessage("Message4");
        createAndSendTextMessage("Message5");
        createAndSendTextMessage("Message6");*/
    }

    private void createAndSendTextMessage(String messageBody) {
        jmsTemplate.send("local-queue", session -> {
            Message message = session.createTextMessage(messageBody);

            message.setStringProperty("JMSXGroupID", "product=paper");

            return message;
        });
    }

    // BEANS

    @Bean
    public JmsListenerContainerFactory<?> myContainerFactory(ConnectionFactory connectionFactory,
            DefaultJmsListenerContainerFactoryConfigurer configurer) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        // This provides all boot's default to this factory, including the message converter
        configurer.configure(factory, connectionFactory);
        // You could still override some of Boot's default if necessary.
        factory.setSubscriptionDurable(true);
        factory.setSubscriptionShared(true);
        factory.setMessageConverter(messagingMessageConverter());

        return factory;
    }

    @Bean
    public MessagingMessageConverter messagingMessageConverter() {
        return new MessagingMessageConverter(messageConverter, new GroupIdMessageMapper());
    }
}

从此文档:消息组中的消息共享相同的组ID,即它们具有相同的组标识符属性(JMS的JMSXGroupID,Apache ActiveMQ Artemis Core API的_AMQ_GROUP_ID)。...

jms spring-jms activemq-artemis
1个回答
1
投票
© www.soinside.com 2019 - 2024. All rights reserved.