使用 Spring Boot 覆盖 kafka 属性

问题描述 投票:0回答:1

我尝试通过加密我的 kafka 属性来保护我的 spring boot 应用程序

例如,要运行我的 jar,我这样做:

java -jar myjar.jar --spring.kafka.bootstrap-servers='encryptedServer' --spring.kafka.properties.ssl.keystore.password='encryptedKeystore'  --spring.kafka.properties.ssl.truststore.password='encryptedTruststore'

我想在应用程序启动期间解密我的属性

我在网上找到了一些东西,然后我就这么做了

public class DecoderEnvironmentPostProcessor implements EnvironmentPostProcessor {

    private static final List<String> ENCODED_PASSWORD = new ArrayList<String>() {{
        add("spring.kafka.bootstrap-servers");
        add("spring.kafka.properties.ssl.keystore.password");
        add("spring.kafka.properties.ssl.truststore.password");
    }};
    
    @Override
    public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
        for (String property : ENCODED_PASSWORD) {
            String encodedPassword = environment.getProperty(property);
            if (encodedPassword != null) {
                try {
                    String decodedPassword = myDecrypt(encodedPassword);
                    environment.getSystemProperties().put(property, decodedPassword);
                } catch (Exception e) {
                    //TODO
                }
            }
        }
    }
}

并在文件 src/main/resources/META-INF/spring.factories 中添加了“org.springframework.boot.env.EnvironmentPostProcessor=com.my.app.DecoderEnvironmentPostProcessor”

代码很好地解密了我的属性,但是当我启动应用程序时,此类中出现错误:

@Configuration
@EnableKafka
public class KafkaConsumerConfig implements KafkaListenerConfigurer {

    @Override
    public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
        try {
            Optional<LocalConfig> optionalLocalConfig = localConfigManager.loadConfig();
            this.registrar = registrar;

                MethodKafkaListenerEndpoint<String, JsonNode> endpoint = new MethodKafkaListenerEndpoint<>();
// endpoint setId,setTopics etc ..
                
             registrar.registerEndpoint(endpoint); 

        } catch (KafkaException e) {
            //TODO
        }
    }

“registrar.registerEndpoint(endpoint)”行给出错误

2024-04-18 17:52:57,089 ERROR : configureKafkaListeners Error while configuring kafka listeners, service will be restarted
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
....
Caused by: org.apache.kafka.common.config.ConfigException: Invalid url in bootstrap.servers: encryptedServer
  at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:59)
  at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:48)
  at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:728)
  ... 162 common frames omitted

但是 bootstrap.servers 应该是 decryptedServer 而不是 encrytedServer

spring-boot
1个回答
0
投票

我找到了方法

在configureKafkaListeners中我添加了:

endpoint.setConsumerProperties(decryptEncryptedProperties());

还有

public Properties decryptEncryptedProperties() {

    Properties properties = new Properties();

    String bootstrapServers = environment.getProperty("spring.kafka.bootstrap-servers");
    bootstrapServers = mydecrypt(bootstrapServers);
    properties.put("bootstrap.servers", bootstrapServers);


    String keystorePassword = environment.getProperty("spring.kafka.properties.ssl.keystore.password");
    keystorePassword = mydecrypt(keystorePassword);
    properties.put("ssl.keystore.password", keystorePassword);


    String truststorePassword = environment.getProperty("spring.kafka.properties.ssl.truststore.password");
    truststorePassword = mydecrypt(truststorePassword);
    properties.put("ssl.truststore.password", truststorePassword);


    return properties;
}
© www.soinside.com 2019 - 2024. All rights reserved.