Camel 多次连接到 ActiveMQ Artemis

问题描述 投票:0回答:1

我正在使用 Apache Camel 连接和订阅 ActiveMQ Artemis 源并转发到 Kafka 主题。

它运行了一段时间,但随后因异常而停止:

jakarta.jms.JMSSecurityException: username '[email protected]' has too many connections

很明显,Camel 的工作方式中有一些 ActiveMQ Artemis 或连接一侧的某些安全设置不喜欢的地方。我知道我正在连接的提要确实进行了一些检查,以防止应用程序快速连续多次连接和断开连接;所以我想我做到了。

仔细观察,camel 似乎会定期重新连接和拉取,而不是创建单个连接(甚至每个 Camel 路线创建一个连接);所以我认为这导致经纪人把我踢出去。

还有其他人遇到过这个吗?有什么想法吗?

参考代码:

ActiveMQComponent amqComponent = new ActiveMQComponent();
amqComponent.setUsername(amqUser);
amqComponent.setPassword(amqPassword);
amqComponent.setClientId("clientID");

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setTrustedPackages(List.of("com..."));
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + "?jms.watchTopicAdvisories=false");
amqComponent.setConnectionFactory(cf);
//amqComponent.setMaxConcurrentConsumers(1);
//amqComponent.setSubscriptionShared(true);
//amqComponent.setUsePooledConnection(true);
           
context.addComponent("activemq", amqComponent);

//...

from("activemq:topic:" + amqFeedTopic + "?clientId=" + clientid  + "&durableSubscriptionName=" + clientid + "-sub")
                .id(clientid)
                .description("Connection for " + clientid)
                .to("kafka:" + kafkaTopic + "?brokers=" + kafkaBootstrapServers);
java apache-camel activemq-artemis
1个回答
0
投票

这个答案:Spring Integration: How to use SingleConnectionFactory with ActiveMQ? 为我指明了正确的方向。

本质上,连接工厂决定了方法;默认连接工厂会定期断开连接并重新连接。

要解决此问题,请使用

SingleConnectionFactory
或其子类
CachingConnectionFactory
代替。

现在,这些不能像标准 AMQ 那样接受用户/密码,因此您需要使用

UserCredentialsConnectionFactoryAdapter
来提供这些。

简单吧?

这是一些代码:

ActiveMQComponent amqComponent = new ActiveMQComponent();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + " jms.watchTopicAdvisories=false");
       
UserCredentialsConnectionFactoryAdapter uca = new UserCredentialsConnectionFactoryAdapter();
uca.setUsername(amqUser);
uca.setPassword(amqPassword);
uca.setTargetConnectionFactory(cf);
            
CachingConnectionFactory ccf = new CachingConnectionFactory(uca);
ccf.setClientId(amqClientID);
            
amqComponent.setConnectionFactory(ccf);
amqComponent.setMaxConcurrentConsumers(1);
            
context.addComponent("activemq", amqComponent);
© www.soinside.com 2019 - 2024. All rights reserved.