我有一个 ActiveMQ Artemis 服务器,配置如下:
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672");
} catch (Exception ex) {
ex.printStackTrace();
}
}
它可以成功地从使用 AMQP 的 Nodejs 发布-订阅应用程序和使用 OpenWire 的 Java 应用程序监听主题。使用的主题是
TestTopic
。
虽然这有效,但尽管服务器收到了消息,但这两个协议似乎都没有相互联合消息。
如果在我的 NodeJS 客户端上使用 AMQP 通过
TestTopic
发布消息,服务器会收到该消息并成功广播到 NodeJS 客户端订阅者,但来自 Java 应用程序的 OpenWire 上的其他侦听器永远不会观察到该消息。同样,Java OpenWire 客户端可以相互发送和接收,但 NodeJS 客户端永远不会观察到 Java 客户端消息。
JMS 服务器在两种协议上都看到成功的消息,但从未向
TestTopic
上的所有侦听器/接收器广播。
看来我的 ActiveMQ Artemis 服务器中可能缺少配置。
查看 ActiveMQ Artemis 文档,内容如下:
您应该在目标地址前加上
来使用基于队列的目的地,或使用queue://
来使用基于主题的目的地。当省略目的地前缀时,目的地类型默认为队列。topic://
我已经像这样配置了 NodeJS 目标主题
'topic://TestTopic'
。
// subscriber.js
var args = require('./options.js').options({
'client': { default: 'my-client', describe: 'name of identifier for client container'},
'subscription': { default: 'my-subscription', describe: 'name of identifier for subscription'},
't': { alias: 'topic', default: 'topic://TestTopic', describe: 'name of topic to subscribe to'},
'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).help('help').argv;
var connection = require('rhea').connect({ port:args.port, host: args.host, container_id:args.client });
connection.on('receiver_open', function (context) {
console.log('subscribed');
});
connection.on('message', function (context) {
if (context.message.body === 'detach') {
// detaching leaves the subscription active, so messages sent
// while detached are kept until we attach again
context.receiver.detach();
context.connection.close();
} else if (context.message.body === 'close') {
// closing cancels the subscription
context.receiver.close();
context.connection.close();
} else {
console.log(context.message.body);
}
});
// the identity of the subscriber is the combination of container id
// and link (i.e. receiver) name
connection.open_receiver({name:args.subscription, source:{address:args.topic, durable:2, expiry_policy:'never'}});
在java监听器中,主题配置为
TestTopic
,如下所示:
// client-sender.java
@JmsListener(destination = "TestTopic", selector = "${selector}")
public void receiveMessage(Message message) {
String type = (message instanceof MapMessage) ? "MapMessage" : "TextMessage";
try {
logMessageBody(message);
}
} catch (Exception ex) {
logger.error("Exception caught while logging the received message: ");
logger.error(ex + ": " + ex.getCause());
}
}
绝对支持在协议之间交换消息,并且可以通过正确的配置来工作。
值得注意的是,您引用的文档实际上是针对 ActiveMQ Classic 而不是 ActiveMQ Artemis。需要明确的是,您可以在 ActiveMQ Artemis 中使用目标前缀,但它们可以在 ActiveMQ Artemis 中进行配置,并且必须在接受器 URL 上指定才能应用。这在实际的 ActiveMQ Artemis 文档中进行了讨论。在你的情况下,你需要这个:
public static void updateConfig(Configuration config) {
try {
config.setPersistenceEnabled(false)
.setSecurityEnabled(false)
.addAcceptorConfiguration("tcp", "tcp://localhost:61616")
.addAcceptorConfiguration("amqp", "tcp://localhost:5672?anycastPrefix=queue://;multicastPrefix=topic://");
} catch (Exception ex) {
ex.printStackTrace();
}
}
需要明确的是,JMS 队列和主题目标分别“自动映射”到任播和多播。 一旦两个客户端都使用兼容的路由类型,那么消息应该能够在它们之间来回流动。请记住,发布/订阅(即 JMS 主题)语义要求在发送任何消息之前
创建订阅者。当不存在订阅者时发送的消息将被简单地丢弃。