ActiveMQ JMSXGroupID无法按预期工作

问题描述 投票:0回答:1

我使用此脚本在本地启动ActiveMQ:

docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq

它将启动AMQ版本5.15.6。我通过Websockets(v1.2)与带有STOMP的AMQ连接。通过“测试”队列中的Web控制台,我创建了两个消息,它们均具有相同的组“ test_grp”。我启动两个进程,每个进程运行相同的逻辑:

  • 连接
  • 使用activemq.prefetchSize: 1ack: client-individual标头订阅“ / queue / test”
  • 在新消息上等待5秒并发送确认消息

两个进程都立即接收消息,而第二条消息应该由同一进程接收,或者至少应该在第一个消息被确认后才接收。

此外,如果我仅使用activemq.prefetchSize: 2标头启动一个进程/订阅,则此进程将立即收到两个消息,而不是在确认第一个消息之后依次接收两个消息。

因此,似乎JMSXGroupID对消息的处理方式没有任何影响。经纪人方面是否可能未正确配置某些东西?

我确定消息不会自动被确认,因为直到消费者确认它们之前,它们仍在队列中。


[经过一些测试,我发现对两个消费者进行分组是可行的。但是,对于activemq.prefetchSize: 2的一个消费者,它仍然会立即从同一组接收到两条消息。这是预期的行为吗?如果是,那么看来如果有人要处理消息以便他必须在订阅时将activemq.prefetchSize设置为1

这里是一段要测试的代码(Node.js 12.x,需要软件包@stomp/stompjswebsocket:]

Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');

function createClient() {
  return new Client({
    brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
  });
}

function createLog(name) {
  return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}

function createConsumer(name) {
  const log = createLog(name);
  const client = createClient();

  client.onConnect = () => {
    log('CLIENT_CONNECTED');
    client.subscribe('/queue/test', (msg) => {
      log('RECEIVED_MESSAGE');
      setTimeout(() => {
        msg.ack();
        log('ACKED_MESSAGE');
      }, 10000);
    }, {
      'activemq.prefetchSize': '2',
      ack: 'client-individual',
    });
  }

  client.activate();
}

function publishMessages() {
  const log = createLog();
  const client = createClient();

  client.onConnect = () => {
    client.publish({
      destination: '/queue/test',
      headers: {
        persistent: true,
        JMSXGroupID: 'grp',
      },
      body: 'test message 1',
    });
    client.publish({
      destination: '/queue/test',
      headers: {
        persistent: true,
        JMSXGroupID: 'grp',
      },
      body: 'test message 2',
    });
  };

  client.activate();
}

createConsumer('A');
createConsumer('B');

setTimeout(() => {
  publishMessages();
}, 2000);

输出:

22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE
activemq stomp
1个回答
0
投票

您正在看到预期的行为。如果将activemq.prefetchSize设置为大于1的值,则代理一次将向客户端分发多于1条消息。由于stompjs会在收到消息时立即调用传递给subscribe的回调函数,因此您必须自己控制有序的确认或将activemq.prefetchSize设置为1

© www.soinside.com 2019 - 2024. All rights reserved.