我正在使用 Apache Camel 连接和订阅 ActiveMQ Artemis 源并转发到 Kafka 主题。
它运行了一段时间,但随后因异常而停止:
jakarta.jms.JMSSecurityException: username '[email protected]' has too many connections
很明显,Camel 的工作方式中有一些 ActiveMQ Artemis 或连接一侧的某些安全设置不喜欢的地方。我知道我正在连接的提要确实进行了一些检查,以防止应用程序快速连续多次连接和断开连接;所以我想我做到了。
仔细观察,camel 似乎会定期重新连接和拉取,而不是创建单个连接(甚至每个 Camel 路线创建一个连接);所以我认为这导致经纪人把我踢出去。
还有其他人遇到过这个吗?有什么想法吗?
参考代码:
ActiveMQComponent amqComponent = new ActiveMQComponent();
amqComponent.setUsername(amqUser);
amqComponent.setPassword(amqPassword);
amqComponent.setClientId("clientID");
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setTrustedPackages(List.of("com..."));
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + "?jms.watchTopicAdvisories=false");
amqComponent.setConnectionFactory(cf);
//amqComponent.setMaxConcurrentConsumers(1);
//amqComponent.setSubscriptionShared(true);
//amqComponent.setUsePooledConnection(true);
context.addComponent("activemq", amqComponent);
//...
from("activemq:topic:" + amqFeedTopic + "?clientId=" + clientid + "&durableSubscriptionName=" + clientid + "-sub")
.id(clientid)
.description("Connection for " + clientid)
.to("kafka:" + kafkaTopic + "?brokers=" + kafkaBootstrapServers);
这个答案:Spring Integration: How to use SingleConnectionFactory with ActiveMQ? 为我指明了正确的方向。
本质上,连接工厂决定了方法;默认连接工厂会定期断开连接并重新连接。
要解决此问题,请使用
SingleConnectionFactory
或其子类 CachingConnectionFactory
代替。
现在,这些不能像标准 AMQ 那样接受用户/密码,因此您需要使用
UserCredentialsConnectionFactoryAdapter
来提供这些。
简单吧?
这是一些代码:
ActiveMQComponent amqComponent = new ActiveMQComponent();
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
cf.setBrokerURL("tcp://" + amqServer + ":" + amqPort + " jms.watchTopicAdvisories=false");
UserCredentialsConnectionFactoryAdapter uca = new UserCredentialsConnectionFactoryAdapter();
uca.setUsername(amqUser);
uca.setPassword(amqPassword);
uca.setTargetConnectionFactory(cf);
CachingConnectionFactory ccf = new CachingConnectionFactory(uca);
ccf.setClientId(amqClientID);
amqComponent.setConnectionFactory(ccf);
amqComponent.setMaxConcurrentConsumers(1);
context.addComponent("activemq", amqComponent);