ActiveMQ Artemis:多播地址传递消息不一致

问题描述 投票:0回答:1

我们正在使用Artemis 2.19.0并分发XML消息。最近,我们发现一些消息在发送到绑定了两个持久多播队列的多播地址时可能会丢失,这两个持久队列都有一个XPATH过滤器。

例如:

  1. 创建一个名为:IN.ADDRESS.FOO
  2. 的多播地址
  3. 在其下创建两个持久队列,命名为:IN.QUEUE1.FOOIN.QUEUE2.FOO
  4. 为两个队列设置 XPATH 过滤器: XPATH '/XML/DATA/Direction[text()="Right"]'
  5. 发送 1000 条相同的匹配 XML 消息到地址 IN.ADDRESS.FOO(使用 JMS 和消息大小 20kb)

不知何故,IN.QUEUE1.FOOIN.QUEUE2.FOO或两者最终都不会收到1000条消息。

我们已经厌倦了从其中一个队列中删除过滤器,然后一切正常,两个队列都将收到 1000 条消息。

  1. 我们使用的是具有一个活动节点和一个备份节点的 HA 架构。
  2. 我们已经检查了 DLQ,但没有发现任何内容,因此不应超出最大重新投递尝试次数。
  3. 即使没有消费者连接,这种行为也会发生。

所以,我的问题是:

  1. 这是否是 XML 过滤器可能比普通过滤器慢得多导致某些消息丢失的原因?
  2. 如果不是的话可能是什么原因?

有什么不清楚的地方请追问。 谢谢

更新1:

版本:

Java: 1.8
Spring-Integration: 5.5.11
Spring-jms: 5.3.19
Artemis: 2.19.0

XML 文件:

<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns:beans="http://www.springframework.org/schema/beans"
             xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
             xmlns:int-file="http://www.springframework.org/schema/integration/file"
             xmlns:task="http://www.springframework.org/schema/task"
             xmlns:jms="http://www.springframework.org/schema/integration/jms"
             xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration
            http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
            http://www.springframework.org/schema/integration/jms
            http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd
            http://www.springframework.org/schema/integration/xml
            http://www.springframework.org/schema/integration/xml/spring-integration-xml-4.3.xsd
            http://www.springframework.org/schema/integration/file
            http://www.springframework.org/schema/integration/file/spring-integration-file-4.3.xsd
            http://www.springframework.org/schema/task
            http://www.springframework.org/schema/task/spring-task-4.3.xsd">
    <!-- Multicast address -->
    <beans:bean id="topic" class="org.apache.activemq.artemis.jms.client.ActiveMQTopic">
        <beans:constructor-arg value="IN.ADDRESS.FOO"/>
    </beans:bean>

    <!-- Anycast queue -->
    <beans:bean id="queue" class="org.apache.activemq.artemis.jms.client.ActiveMQQueue">
        <beans:constructor-arg value="AQ.QUEUE.FOO"/>
    </beans:bean>

    <channel id="topicChannel">
    </channel>

    <task:executor id="executor" pool-size="2"/>

    <publish-subscribe-channel id="outChannel" task-executor="executor"/>

    <filter id="consumer1" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>

    <filter id="consumer2" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>

    <jms:message-driven-channel-adapter id="queueAdapter" destination="queue" channel="outChannel"
                                        acknowledge="auto" connection-factory="ConnectionFactory"/>

    <jms:outbound-channel-adapter  id="topicAdapter" destination="topic" channel="topicChannel"
                                  connection-factory="ConnectionFactory"/>


</beans:beans>

连接工厂 Bean:

@Bean(name = "ConnectionFactory")
    public SingleConnectionFactory ibConnectionFactory(
            @Value("${artemis.broker-url}") String brokerUrl,
            @Value("${artemis.user}") String username,
            @Value("${artemis.password}") String password) throws JMSException {

        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL(brokerUrl);
        factory.setUser(username);
        factory.setPassword(password);
        return new SingleConnectionFactory(factory);
    }

发送程序:

try(ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password)) {
            Connection conn = fac.createConnection();
            Session session = conn.createSession();
            MessageProducer producer = session.createProducer(new ActiveMQQueue("AQ.QUEUE.FOO"));
            Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");

            int count = 0;
            while (count < 1000) {
                System.out.println(count);
                producer.send(msg);
                count ++;
            }
        }

XPATH 过滤器:

XPATH '/Root/Data/PrimaryKey/Key/DetailedIdentity/ATCode[text()="AK"]'

消息示例:

<?xml version="1.0" encoding="UTF-8"?><Root xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>

删除发布订阅通道上的任务执行器属性或删除队列的过滤器之一可以解决问题。

更新2:

一个最小的例子,有 10 个并发任务总共发送 1000 条消息,如果同一地址下的所有队列都有 XPATH 过滤器,那么它不会收到 1000 条消息,但删除其中一个过滤器它可以工作。

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;

import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;


public class TestRunner {

    public static void main(String[] args) throws Exception {
        String brokerUrl = "(tcp://server1:61616,tcp://server2:61616)?ha=true&reconnectAttempts=-1&retryInterval=100&retryIntervalMultiplier=1.5&maxRetryInterval=6000";
        String user = "admin";
        String password = "admin";

        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password);
        ExecutorService ser = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            ser.submit(() -> {
                Connection conn = null;
                try {
                    conn = fac.createConnection();
                    Session session = conn.createSession();
                    MessageProducer producer = session.createProducer(new ActiveMQTopic("IN.ADDRESS.FOO"));
                    Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
                    int count = 0;
                    while (count < 100) {
                        System.out.println(count);
                        msg.setStringProperty("MessageId", String.valueOf(count));
                        producer.send(msg);
                        count++;
                    }
                    session.close();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
jms activemq-artemis
1个回答
0
投票

感谢您的测试用例。我能够用它来重现您所看到的错误。我打开了 ARTEMIS-4687 并发送了 PR 来解决该问题。该问题将在未来几周内发布的 2.33.0 版本中得到修复。

© www.soinside.com 2019 - 2024. All rights reserved.