我使用此脚本在本地启动ActiveMQ:
docker run -p 61616:61616 -p 61614:61614 -p 8161:8161 -it -v conf:/opt/activemq/conf -v data:/opt/activemq/data rmohr/activemq
它将启动AMQ版本5.15.6。我通过Websockets(v1.2)与带有STOMP的AMQ连接。通过“测试”队列中的Web控制台,我创建了两个消息,它们均具有相同的组“ test_grp”。我启动两个进程,每个进程运行相同的逻辑:
activemq.prefetchSize: 1
和ack: client-individual
标头订阅“ / queue / test”两个进程都立即接收消息,而第二条消息应该由同一进程接收,或者至少应该在第一个消息被确认后才接收。
此外,如果我仅使用activemq.prefetchSize: 2
标头启动一个进程/订阅,则此进程将立即收到两个消息,而不是在确认第一个消息之后依次接收两个消息。
因此,似乎JMSXGroupID对消息的处理方式没有任何影响。经纪人方面是否可能未正确配置某些东西?
我确定消息不会自动被确认,因为直到消费者确认它们之前,它们仍在队列中。
[经过一些测试,我发现对两个消费者进行分组是可行的。但是,对于activemq.prefetchSize: 2
的一个消费者,它仍然会立即从同一组接收到两条消息。这是预期的行为吗?如果是,那么看来如果有人要处理消息以便他必须在订阅时将activemq.prefetchSize
设置为1
?
这里是一段要测试的代码(Node.js 12.x,需要软件包@stomp/stompjs
和websocket
:]
Object.assign(global, { WebSocket: require('websocket').w3cwebsocket });
const { Client } = require('@stomp/stompjs');
function createClient() {
return new Client({
brokerURL: 'ws://localhost:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600',
});
}
function createLog(name) {
return (...str) => console.log.apply(console, [`${new Date().toISOString().split('T')[1]} <${name}>`, ...str]);
}
function createConsumer(name) {
const log = createLog(name);
const client = createClient();
client.onConnect = () => {
log('CLIENT_CONNECTED');
client.subscribe('/queue/test', (msg) => {
log('RECEIVED_MESSAGE');
setTimeout(() => {
msg.ack();
log('ACKED_MESSAGE');
}, 10000);
}, {
'activemq.prefetchSize': '2',
ack: 'client-individual',
});
}
client.activate();
}
function publishMessages() {
const log = createLog();
const client = createClient();
client.onConnect = () => {
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 1',
});
client.publish({
destination: '/queue/test',
headers: {
persistent: true,
JMSXGroupID: 'grp',
},
body: 'test message 2',
});
};
client.activate();
}
createConsumer('A');
createConsumer('B');
setTimeout(() => {
publishMessages();
}, 2000);
输出:
22:50:03.196Z <B> CLIENT_CONNECTED
22:50:03.199Z <A> CLIENT_CONNECTED
22:50:05.195Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:05.198Z <B> RECEIVED_MESSAGE <-- ~same time
22:50:15.196Z <B> ACKED_MESSAGE
22:50:15.198Z <B> ACKED_MESSAGE
您正在看到预期的行为。如果将activemq.prefetchSize
设置为大于1的值,则代理一次将向客户端分发多于1条消息。由于stompjs会在收到消息时立即调用传递给subscribe
的回调函数,因此您必须自己控制有序的确认或将activemq.prefetchSize
设置为1
。