如何将 Apache Camel 与 MSK 架构注册表连接

问题描述 投票:0回答:1

我已经在 AWS GLUE 的 MSK 中创建了一个架构,但我无法使用应用程序的生产者来配置它。

在 MKS 中创建的架构:

Schema Created

Kafka 配置

class KafkaConfig {

    @Value("${brokerHost}")
    String brokerHost;

    @Value("${saslMechanism}")
    String saslMechanism;
    @Value("${securityProtocol}")
    String securityProtocol;
    @Value("${brokerConfig}")
    String brokerConfig;
    @Autowired
    CamelContext camelContext;

    @Bean
    public KafkaConfiguration kafkaConfiguration() {
        KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
        kafkaConfiguration.setBrokers(brokerHost);
        kafkaConfiguration.setSaslMechanism(saslMechanism);
        kafkaConfiguration.setSecurityProtocol(securityProtocol);
        kafkaConfiguration.setSaslJaasConfig(brokerConfig);
        return kafkaConfiguration;
    }

    @Bean(name = "appKafkaProducer")
    public KafkaComponent kafkaComponentProducer() throws Exception {
        KafkaComponent kafkaComponentProducer = new KafkaComponent();
        kafkaComponentProducer.setConfiguration(kafkaConfiguration());;
        camelContext.addComponent("appKafkaProducer", kafkaComponentProducer);
        return kafkaComponentProducer;
    }

    @Bean(name = "appKafkaConsumer")
    public KafkaComponent kafkaComponentConsumer() {
        KafkaComponent kafkaComponentConsumer = new KafkaComponent();
        kafkaComponentConsumer.setConfiguration(kafkaConfiguration());
        camelContext.addComponent("appKafkaConsumer", kafkaComponentConsumer);
        return kafkaComponentConsumer;
    }




}

生产者路线:

<get uri="/refresh">
    <description>Refresh the Route</description>
    <route id="refreshRoute">
        <doTry>
        
            <to uri="appKafkaProducer:Checking" />
            <setBody>
                <constant>ok</constant>
            </setBody>
            <doCatch>
                <exception>java.lang.Exception</exception>
                <to uri="bean:commonsExchangeUtil?method=setErrorResponse(500, 'Internal Server Error', ${exception.message}, *)" />
            </doCatch>
        </doTry>
    </route>
</get>

我查了Camel的官方文档,但找不到合适的方法

我需要添加哪些配置,以便我能够使用模式配置生产者,以便我们可以验证

java amazon-web-services apache-kafka apache-camel
1个回答
0
投票

您可以尝试为 kafka 组件添加 additionalProperties 作为键值对,并设置 AWS 文档

中指定的属性
© www.soinside.com 2019 - 2024. All rights reserved.