我有一个ActiveMQ服务器实例,它运行着多个数据源,如果消息使用失败,则将数据推入两个队列+一个DLQ。我使用Apache Camel来消费和处理这些队列中的消息,并希望将其写入InfluxDB。
但是,到目前为止,我未能使堆栈运行,因此Apache Camel并行使用了所有队列。我总是遇到这种错误:
ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port
如何由一个Apache Camel实例从多个队列中消费?
我尝试了两种方法:
当前,我的代码如下:
骆驼配置
@Configuration
public class CamelConfig {
@Bean
public ShutdownStrategy shutdownStrategy() {
MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
int timeout = 1200;
MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
Thread.currentThread().getStackTrace()[0].getMethodName());
strategy.setTimeout(timeout); // TODO make it configurable
return strategy;
}
}
ActiveMQ客户端配置
@Configuration
public class ActiveMqClientConfig {
@Bean
public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://servername:61616");
connectionFactory.setUserName(username);
connectionFactory.setPassword(passwd);
connectionFactory.setUseAsyncSend(false);
connectionFactory.setClientID("Influx Message Queue");
connectionFactory.setConnectResponseTimeout(300);
MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
Thread.currentThread().getStackTrace()[0].getMethodName());
return connectionFactory;
}
}
Influx Config
@Configuration
public class InfluxDBClientConfig {
@Bean
public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
return () -> {
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
Thread.currentThread().getStackTrace()[0].getMethodName());
Builder builder = new OkHttpClient.Builder() //
.readTimeout(1200, TimeUnit.SECONDS) //
.writeTimeout(1200, TimeUnit.SECONDS) //
.connectTimeout(1200, TimeUnit.SECONDS) //
.retryOnConnectionFailure(true);
MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
Thread.currentThread().getStackTrace()[0].getMethodName());
return builder;
};
}
}
具有多条路线的组件:
@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
@Autowired
private FrameworkConfig frameworkConfig;
@Override
public void configure() throws Exception {
String consumerQueueq = "activemq:queue:queue1?" //
+ "brokerURL=tcp://ip:port";
String consumerActiveMqDLQ = "activemq:queue:ActiveMQ.DLQ?" //
+ "brokerURL=tcp://ip:port";
String consumerQueue2 = "activemq:queue:queue2?" //
+ "brokerURL=tcp://ip:port";
String emitterInfluxDB = "influxdb://influxDb?databaseName=databaseName"
+ "&batch=true" //
+ "&retentionPolicy=retentionPolicy"
String emitterStreamOut = "stream:out";
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerCryringInbound) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from cryring_db_inbound to InfluxDB
//************************************************************************
from(consumerActiveMqDLQ) //
.process(messagePayload -> {
Message message = messagePayload.getIn();
if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
Processor.processMessage(message.getBody(String.class), message);
} else {
Processor.processMessage(message);
}
})//
.to(emitterInfluxDB) //
.onException(Exception.class) //
.useOriginalMessage() //
.handled(true) //
.log("error") //
.to(emitterStreamOut);
//************************************************************************
// Data from olog_inbound to olog
//************************************************************************
from(consumerOlog) //
.process(messagePayload -> {
System.out.println(messagePayload.getIn());
}) //
.to(emitterStreamOut);
}
}
[只有一个客户端可以使用ClientID
。它们必须是唯一的,并且不是您可能要手动设置的内容。另一个选择可能是设置ClientIDPrefix
,以便获得更好的方法来识别哪个应用正在使用。