NodeJS rhea AMQP客户端的故障转移

问题描述 投票:0回答:1

我在计算机上运行了多个ActiveMQ实例。它们被配置为shared file system master slave。如果一台ActiveMQ服务器已关闭,则另一台应自动启动。如预期般运作。

ActiveMQ的first实例的相关配置:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61626?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61623?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1889?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61625?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<persistenceAdapter>
    <kahaDB directory="/Users/srikanth.doddi/Downloads/apachemq3.2.4/data/kahadb"/>
</persistenceAdapter>

ActiveMQ的second实例的相关配置:

<transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
<persistenceAdapter>
    <kahaDB directory="/Users/srikanth.doddi/Downloads/apachemqold/data/kahadb"/>
</persistenceAdapter>

我正在NodeJS中使用AMQP连接,通过以下方式使用rhea

var container = require('rhea');
container.on('message', function (context) {
    console.log(context.message.body);
    context.connection.close();
});
container.once('sendable', function (context) {
    context.sender.send({body:'Hello World!'});
});
var connection = container.connect({'port':5672});
connection.open_receiver('examples');
connection.open_sender('examples');

现在,如果5672出现故障,我的ActiveMQ将作为主从设备运行,我希望客户端自动连接到5673并继续工作。此检查应连续进行。

这是它在Spring-boot中的实现方式

activemq_url=tcp://localhost:61616,tcp://localhost:61626

spring.activemq.broker-url=failover://(${activemq_url})?randomize=false
node.js activemq amqp rhea
1个回答
0
投票

[您必须做一些工作来构建客户端代码,以便进行重新连接,rhea没有像Qpid JMS或Artemis Core JMS客户端那样的客户端具有那种故障转移处理支持。

Rhea源代码树中有一个重新连接example,您可以以此为起点将该逻辑构建到自己的应用程序中。

© www.soinside.com 2019 - 2024. All rights reserved.