我已经在 AWS GLUE 的 MSK 中创建了一个架构,但我无法使用应用程序的生产者来配置它。
在 MKS 中创建的架构:
Kafka 配置:
class KafkaConfig {
@Value("${brokerHost}")
String brokerHost;
@Value("${saslMechanism}")
String saslMechanism;
@Value("${securityProtocol}")
String securityProtocol;
@Value("${brokerConfig}")
String brokerConfig;
@Autowired
CamelContext camelContext;
@Bean
public KafkaConfiguration kafkaConfiguration() {
KafkaConfiguration kafkaConfiguration = new KafkaConfiguration();
kafkaConfiguration.setBrokers(brokerHost);
kafkaConfiguration.setSaslMechanism(saslMechanism);
kafkaConfiguration.setSecurityProtocol(securityProtocol);
kafkaConfiguration.setSaslJaasConfig(brokerConfig);
return kafkaConfiguration;
}
@Bean(name = "appKafkaProducer")
public KafkaComponent kafkaComponentProducer() throws Exception {
KafkaComponent kafkaComponentProducer = new KafkaComponent();
kafkaComponentProducer.setConfiguration(kafkaConfiguration());;
camelContext.addComponent("appKafkaProducer", kafkaComponentProducer);
return kafkaComponentProducer;
}
@Bean(name = "appKafkaConsumer")
public KafkaComponent kafkaComponentConsumer() {
KafkaComponent kafkaComponentConsumer = new KafkaComponent();
kafkaComponentConsumer.setConfiguration(kafkaConfiguration());
camelContext.addComponent("appKafkaConsumer", kafkaComponentConsumer);
return kafkaComponentConsumer;
}
}
生产者路线:
<get uri="/refresh">
<description>Refresh the Route</description>
<route id="refreshRoute">
<doTry>
<to uri="appKafkaProducer:Checking" />
<setBody>
<constant>ok</constant>
</setBody>
<doCatch>
<exception>java.lang.Exception</exception>
<to uri="bean:commonsExchangeUtil?method=setErrorResponse(500, 'Internal Server Error', ${exception.message}, *)" />
</doCatch>
</doTry>
</route>
</get>
我查了Camel的官方文档,但找不到合适的方法
我需要添加哪些配置,以便我能够使用模式配置生产者,以便我们可以验证
您可以尝试为 kafka 组件添加 additionalProperties 作为键值对,并设置 AWS 文档
中指定的属性