Spring Webflux:无法启动kafka消费者

问题描述 投票:0回答:0

当我尝试启动连接到本地 kafka 代理的应用程序时,一切正常。

但是当我尝试使用 ssl 身份验证连接到我的托管代理时,出现以下错误:

    2023-03-22 12:07:29,309 ERROR[restartedMain] InputEventConsumer - error while consuming message
org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:823)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:664)
    at reactor.kafka.receiver.internals.ConsumerFactory.createConsumer(ConsumerFactory.java:34)
    at reactor.kafka.receiver.internals.DefaultKafkaReceiver.lambda$withHandler$16(DefaultKafkaReceiver.java:132)
    at reactor.core.publisher.MonoCallable.call(MonoCallable.java:92)
    at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:81)
    at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8642)
    at reactor.core.publisher.Flux.subscribeWith(Flux.java:8815)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8608)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8532)
    at reactor.core.publisher.Flux.subscribe(Flux.java:8502)
    at messaging.inputEvent.InputEventConsumer.consumeMessage(InputEventConsumer.java:55)
    at messaging.inputEvent.InputEventConsumerInitialiser.lambda$initialiseConsumers$0(InputEventConsumerInitialiser.java:42)
    at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
    at java.util.stream.IntPipeline$Head.forEach(IntPipeline.java:581)
    at messaging.inputEvent.InputEventConsumerInitialiser.initialiseConsumers(InputEventConsumerInitialiser.java:38)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.doInvoke(ApplicationListenerMethodAdapter.java:344)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.processEvent(ApplicationListenerMethodAdapter.java:229)
    at org.springframework.context.event.ApplicationListenerMethodAdapter.onApplicationEvent(ApplicationListenerMethodAdapter.java:166)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:176)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:169)
    at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:143)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:421)
    at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:378)
    at org.springframework.boot.context.event.EventPublishingRunListener.ready(EventPublishingRunListener.java:114)
    at org.springframework.boot.SpringApplicationRunListeners.lambda$ready$6(SpringApplicationRunListeners.java:82)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:120)
    at org.springframework.boot.SpringApplicationRunListeners.doWithListeners(SpringApplicationRunListeners.java:114)
    at org.springframework.boot.SpringApplicationRunListeners.ready(SpringApplicationRunListeners.java:82)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1303)
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1292)
    at SalesTenderEventConsumerApplication.main(SalesTenderEventConsumerApplication.java:10)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49)
Caused by: org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Cannot recover key
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:268)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.configure(DefaultSslEngineFactory.java:173)
    at org.apache.kafka.common.security.ssl.SslFactory.instantiateSslEngineFactory(SslFactory.java:140)
    at org.apache.kafka.common.security.ssl.SslFactory.configure(SslFactory.java:97)
Caused by: org.apache.kafka.common.KafkaException: java.security.UnrecoverableKeyException: Cannot recover key

    at org.apache.kafka.common.network.SslChannelBuilder.configure(SslChannelBuilder.java:73)
    at org.apache.kafka.common.network.ChannelBuilders.create(ChannelBuilders.java:192)
    at org.apache.kafka.common.network.ChannelBuilders.clientChannelBuilder(ChannelBuilders.java:81)
    at org.apache.kafka.clients.ClientUtils.createChannelBuilder(ClientUtils.java:105)
    at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:736)
    ... 43 common frames omitted
Caused by: java.security.UnrecoverableKeyException: Cannot recover key
    at sun.security.provider.KeyProtector.recover(KeyProtector.java:315)
    at sun.security.provider.JavaKeyStore.engineGetKey(JavaKeyStore.java:143)
    at sun.security.provider.JavaKeyStore$JKS.engineGetKey(JavaKeyStore.java:57)
    at sun.security.provider.KeyStoreDelegator.engineGetKey(KeyStoreDelegator.java:96)
Caused by: java.security.UnrecoverableKeyException: Cannot recover key

    at sun.security.provider.JavaKeyStore$DualFormatJKS.engineGetKey(JavaKeyStore.java:71)
    at java.security.KeyStore.getKey(KeyStore.java:1023)
    at sun.security.ssl.SunX509KeyManagerImpl.<init>(SunX509KeyManagerImpl.java:145)
    at sun.security.ssl.KeyManagerFactoryImpl$SunX509.engineInit(KeyManagerFactoryImpl.java:70)
    at javax.net.ssl.KeyManagerFactory.init(KeyManagerFactory.java:256)
    at org.apache.kafka.common.security.ssl.DefaultSslEngineFactory.createSSLContext(DefaultSslEngineFactory.java:251)
    ... 51 common frames omitted

我的消费者代码:

@EventListener(ApplicationReadyEvent.class)
public void initialiseConsumers() {
    consumers = Arrays.asList(new InputEventConsumer[kafkaConfig.getConsumersCount()]);
    IntStream.range(0, kafkaConfig.getConsumersCount()).forEach(i -> {
        consumers.set(i, applicationContext.getBean(InputEventConsumer.class));
        consumers.get(i).setConsumerIndex(i);
        consumers.get(i).setInputEventReceiver(kafkaConfig.inputEventReceiver());
        consumers.get(i).consumeMessage();
    });
}

public Disposable consumeMessage() {
    log.info("starting consumption");
    return processKafkaRecord().subscribe(record -> log.info("successfully consumed event"),
            error -> log.error("error while consuming message", error));
}

我添加的 ssl 属性是:

        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, securityProtocol);
        props.put(SslConfigs.SSL_PROTOCOL_CONFIG, protocol);

        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, passwordsConfig.getKafka().getSslKeyPassword());
        props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, passwordsConfig.getKafka().getSslTruststorePassword());
        props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, passwordsConfig.getKafka().getSslKeystorePassword());
        props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "");

        sslConfigFilesLocations.getTruststorePath().ifPresent(path ->
                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, path.toAbsolutePath().toString()));
        sslConfigFilesLocations.getKeystorePath().ifPresent(path ->
                props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, path.toAbsolutePath().toString()));
        props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
        props.put(SaslConfigs.SASL_MECHANISM, saslMechanism);

现在,问题是相同的配置值我有另一个服务(不是 webflux)并且消费者能够从托管代理消费。所以,问题不应该与经纪人或网络有关。

我尝试为错误找到其他解决方案,但我无法弄清楚哪个有效。

此外,如果有帮助,我正在使用以下依赖项

implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.projectlombok:lombok'
implementation "io.projectreactor.kafka:reactor-kafka:1.3.15"
developmentOnly 'org.springframework.boot:spring-boot-devtools'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'io.projectreactor:reactor-test'
implementation 'io.projectreactor.addons:reactor-extra'
implementation group: 'org.apache.commons', name: 'commons-lang3', version: '3.0'
implementation group: 'com.github.ben-manes.caffeine', name: 'caffeine', version: '2.8.8'
implementation group: 'com.azure', name: 'azure-security-keyvault-keys', version: '4.2.8'
implementation group: 'com.azure', name: 'azure-identity', version: '1.2.1'
testImplementation 'org.spockframework:spock-core:2.0-M4-groovy-3.0'
testImplementation 'org.codehaus.groovy:groovy-all:3.0.7'
testImplementation 'io.mockk:mockk:1.12.0'
apache-kafka spring-webflux project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.