Apache Camel从一个ActiveMQ实例上的多个队列中消费

问题描述 投票:0回答:1

我有一个ActiveMQ服务器实例,它运行着多个数据源,如果消息使用失败,则将数据推入两个队列+一个DLQ。我使用Apache Camel来消费和处理这些队列中的消息,并希望将其写入InfluxDB。

但是,到目前为止,我未能使堆栈运行,因此Apache Camel并行使用了所有队列。我总是遇到这种错误:

ERROR 20831 --- [r[ActiveMQ.DLQ]]
c.c.j.DefaultJmsMessageListenerContainer : Could not refresh JMS
Connection for destination 'ActiveMQ.DLQ' - retrying using
FixedBackOff{interval=5000, currentAttempts=270,
maxAttempts=unlimited}. Cause: Broker: localhost - Client: Influx
Message Queue already connected from tcp://ip-of-machine-running-route:port

如何由一个Apache Camel实例从多个队列中消费?

我尝试了两种方法:

当前,我的代码如下:

骆驼配置

@Configuration
public class CamelConfig {

    @Bean
    public ShutdownStrategy shutdownStrategy() {
        MessageLogger.logInfo(getClass(), "Camel Route: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        DefaultShutdownStrategy strategy = new DefaultShutdownStrategy();
        int                     timeout  = 1200;
        MessageLogger.logInfo(getClass(), "Camel Route: Timeout for shutdown: " + timeout + " seconds.",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        strategy.setTimeout(timeout); // TODO make it configurable
        return strategy;
    }
}

ActiveMQ客户端配置

@Configuration
public class ActiveMqClientConfig {

    @Bean
    public ActiveMQConnectionFactory registerActiveMQConnectionFactory() {
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTING...",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        connectionFactory.setBrokerURL("tcp://servername:61616");
        connectionFactory.setUserName(username);
        connectionFactory.setPassword(passwd);
        connectionFactory.setUseAsyncSend(false);
        connectionFactory.setClientID("Influx Message Queue");
        connectionFactory.setConnectResponseTimeout(300);
        MessageLogger.logInfo(getClass(), "ActiveMQ Listener: STARTED",
                Thread.currentThread().getStackTrace()[0].getMethodName());
        return connectionFactory;
    }
}

Influx Config

@Configuration
public class InfluxDBClientConfig {

    @Bean
    public InfluxDbOkHttpClientBuilderProvider registerInfluxDbOkHttpClientBuilderProvider() {
        return () -> {
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTING...",
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            Builder builder = new OkHttpClient.Builder() //
                    .readTimeout(1200, TimeUnit.SECONDS) //
                    .writeTimeout(1200, TimeUnit.SECONDS) //
                    .connectTimeout(1200, TimeUnit.SECONDS) //
                    .retryOnConnectionFailure(true);
            MessageLogger.logInfo(getClass(), "InfluxDB Client: STARTED - " + builder.toString(),
                    Thread.currentThread().getStackTrace()[0].getMethodName());
            return builder;
        };
    }
}

具有多条路线的组件:

@Component
public class ActiveMqToInfluxRoute extends RouteBuilder {
    @Autowired
    private FrameworkConfig frameworkConfig;

    @Override
    public void configure() throws Exception {
        String consumerQueueq = "activemq:queue:queue1?"               //
                + "brokerURL=tcp://ip:port";
        String consumerActiveMqDLQ    = "activemq:queue:ActiveMQ.DLQ?"                     //
                + "brokerURL=tcp://ip:port";
        String consumerQueue2           = "activemq:queue:queue2?"                     //
                + "brokerURL=tcp://ip:port";
        String emitterInfluxDB        = "influxdb://influxDb?databaseName=databaseName"          
                + "&batch=true"                                                            //
                + "&retentionPolicy=retentionPolicy"
        String emitterStreamOut       = "stream:out";

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerCryringInbound) //   
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from cryring_db_inbound to InfluxDB
        //************************************************************************
        from(consumerActiveMqDLQ) //
                .process(messagePayload -> {
                    Message message = messagePayload.getIn();
                    if (message.getBody(String.class).toString().startsWith("@MultiRecords")) {
                        Processor.processMessage(message.getBody(String.class), message);
                    } else {
                        Processor.processMessage(message);
                    }
                })//
                .to(emitterInfluxDB) //
                .onException(Exception.class) //
                .useOriginalMessage() //
                .handled(true) //
                .log("error") //
                .to(emitterStreamOut);

        //************************************************************************
        // Data from olog_inbound to olog
        //************************************************************************
        from(consumerOlog) //
                .process(messagePayload -> {
                    System.out.println(messagePayload.getIn());
                }) //
                .to(emitterStreamOut);
    }
}
java apache-camel activemq
1个回答
0
投票

[只有一个客户端可以使用ClientID。它们必须是唯一的,并且不是您可能要手动设置的内容。另一个选择可能是设置ClientIDPrefix,以便获得更好的方法来识别哪个应用正在使用。

© www.soinside.com 2019 - 2024. All rights reserved.