依赖更新后构建kafka生产者失败

问题描述 投票:0回答:1

在我的 SpringBoot Java 项目中,我使用的是 kafka,特别是 ReactiveKafka。我正在更新依赖项,特别是这些:

  1. springboot 2.6.6 -> 3.1.5
  2. spring-kafka 2.8.0 -> 3.0.11
  3. reactor-kafka 1.3.13 -> 1.3.20
  4. java版本17 更新后,我收到此错误(更新前未抛出该错误):
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka producer

沿着堆栈跟踪,根本原因是:

Caused by: javax.security.auth.login.LoginException: No LoginModule found for org.apache.kafka.common.security.scram.ScramLoginModule

是否存在依赖版本不兼容的问题? 我使用依赖分析器检查了 intellij,但使用 kafka 的所有依赖项都使用相同的版本 All kafka dependencies here 这是我在配置类中的代码:

@Configuration
@AllArgsConstructor
public class KafkaProducerConfiguration {

    private KafkaProperties properties;

    @Autowired
    public KafkaProducerConfiguration(KafkaProperties properties) {
        this.properties = properties;
    }

    @Bean
    public ReactiveKafkaProducerTemplate<String, String> kafkaProducerTemplate() {
        Map<String, Object> producerProps = properties.buildProducerProperties();
        return new ReactiveKafkaProducerTemplate<>(SenderOptions.create(producerProps));
    }

}

我在 application.yml 中的属性如下所示:

spring:
  ...
  kafka:
    bootstrap-servers: my-bootstrap-server
    ssl:
      protocol: TLSv1.2
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-512
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="my_user" password="my_password";
      ssl.enabled.protocols: TLSv1.2
      ssl.endpoint.identification.algorithm: HTTPS
    producer:
      client-id: my-client

通过在溢出时查看此错误,我尝试添加

Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
在返回之前,但我仍然收到错误。 我还尝试将 ContextClassLoader 设置为 null,什么也没有。

java spring-kafka dependency-management
1个回答
0
投票

尝试将其添加到配置中:

@Configuration

公共类 KafkaConfig {

@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;

@Bean
public KafkaAdmin kafkaAdmin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    return new KafkaAdmin(configs);
}

}

对于制作人:

  @Bean
public ProducerFactory<String, Message> producerFactory() {
    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Message> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}

PS:我正在使用外部对象而不是字符串

© www.soinside.com 2019 - 2024. All rights reserved.