如何配置 ActiveMQ Artemis 服务器以跨不同协议 AMQP 和 Openwire 互操作消息

问题描述 投票:0回答:1

我有一个 ActiveMQ Artemis 服务器,配置如下:

public static void updateConfig(Configuration config) {
    try {
        config.setPersistenceEnabled(false)
              .setSecurityEnabled(false)
              .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
              .addAcceptorConfiguration("amqp", "tcp://localhost:5672");
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

它可以成功地从使用 AMQP 的 Nodejs 发布-订阅应用程序和使用 OpenWire 的 Java 应用程序监听主题。使用的主题是

TestTopic

虽然这有效,但尽管服务器收到了消息,但这两个协议似乎都没有相互联合消息。

如果在我的 NodeJS 客户端上使用 AMQP 通过

TestTopic
发布消息,服务器会收到该消息并成功广播到 NodeJS 客户端订阅者,但来自 Java 应用程序的 OpenWire 上的其他侦听器永远不会观察到该消息。同样,Java OpenWire 客户端可以相互发送和接收,但 NodeJS 客户端永远不会观察到 Java 客户端消息。

JMS 服务器在两种协议上都看到成功的消息,但从未向

TestTopic
上的所有侦听器/接收器广播。

看来我的 ActiveMQ Artemis 服务器中可能缺少配置。

查看 ActiveMQ Artemis 文档,内容如下:

您应该在目标地址前加上

queue://
来使用基于队列的目的地,或使用
topic://
来使用基于主题的目的地。当省略目的地前缀时,目的地类型默认为队列。

我已经像这样配置了 NodeJS 目标主题

'topic://TestTopic'

// subscriber.js
var args = require('./options.js').options({
  'client': { default: 'my-client', describe: 'name of identifier for client container'},
  'subscription': { default: 'my-subscription', describe: 'name of identifier for subscription'},
  't': { alias: 'topic', default: 'topic://TestTopic', describe: 'name of topic to subscribe to'},
  'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
  'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).help('help').argv;

var connection = require('rhea').connect({ port:args.port, host: args.host, container_id:args.client });
connection.on('receiver_open', function (context) {
  console.log('subscribed');
});
connection.on('message', function (context) {
  if (context.message.body === 'detach') {
      // detaching leaves the subscription active, so messages sent
      // while detached are kept until we attach again
      context.receiver.detach();
      context.connection.close();
  } else if (context.message.body === 'close') {
      // closing cancels the subscription
      context.receiver.close();
      context.connection.close();
  } else {
      console.log(context.message.body);
  }
});
// the identity of the subscriber is the combination of container id
// and link (i.e. receiver) name
connection.open_receiver({name:args.subscription, source:{address:args.topic, durable:2, expiry_policy:'never'}});

在java监听器中,主题配置为

TestTopic
,如下所示:

// client-sender.java
@JmsListener(destination = "TestTopic", selector = "${selector}")
public void receiveMessage(Message message) {
    String type = (message instanceof MapMessage) ? "MapMessage" : "TextMessage";
    try {
        logMessageBody(message);
        }
    } catch (Exception ex) {
        logger.error("Exception caught while logging the received message: ");
        logger.error(ex + ": " + ex.getCause());
    }
}
java amqp activemq-artemis
1个回答
0
投票

绝对支持在协议之间交换消息,并且可以通过正确的配置来工作。

值得注意的是,您引用的文档实际上是针对 ActiveMQ Classic 而不是 ActiveMQ Artemis。需要明确的是,您可以在 ActiveMQ Artemis 中使用目标前缀,但它们可以在 ActiveMQ Artemis 中进行配置,并且必须在接受器 URL 上指定才能应用。这在实际的 ActiveMQ Artemis 文档中进行了讨论。在你的情况下,你需要这个:

public static void updateConfig(Configuration config) {
    try {
        config.setPersistenceEnabled(false)
              .setSecurityEnabled(false)
              .addAcceptorConfiguration("tcp", "tcp://localhost:61616")
              .addAcceptorConfiguration("amqp", "tcp://localhost:5672?anycastPrefix=queue://;multicastPrefix=topic://");
    } catch (Exception ex) {
        ex.printStackTrace();
    }
}

需要明确的是,JMS 队列和主题目标分别“自动映射”到任播和多播。 一旦两个客户端都使用兼容的路由类型,那么消息应该能够在它们之间来回流动。请记住,发布/订阅(即 JMS 主题)语义要求在发送任何消息之前

创建订阅者。当不存在订阅者时发送的消息将被简单地丢弃。

© www.soinside.com 2019 - 2024. All rights reserved.