Spring Boot 3.1.X及以上版本的Kafka客户端连接问题

问题描述 投票:0回答:1

我最近将我的一项 Spring Boot 服务升级到 3.1.x,升级后我遇到了 kafka 问题。它似乎无法连接并不断给我以下日志。


2024-01-03T06:18:46.313456838Z 2024-01-03T06:18:46.313Z WARN 1 --- [servicerequestorms] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-Dev_Phoenix_ServiceRequestor_CG-1, groupId=Dev_Phoenix_ServiceRequestor_CG] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Wed, Jan 3 2024 11:48:47 am
2024-01-03T06:18:47.403Z INFO 1 --- [servicerequestorms] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-Dev_Phoenix_ServiceRequestor_CG-1, groupId=Dev_Phoenix_ServiceRequestor_CG] Node -1 disconnected. 2024-01-03T06:18:47.403489607Z 2024-01-03T06:18:47.403Z WARN 1 --- [servicerequestorms] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-Dev_Phoenix_ServiceRequestor_CG-1, groupId=Dev_Phoenix_ServiceRequestor_CG] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

但是当我切换回 Spring Boot 3.0.x 时,它工作正常。我在用着

spring-kafka 3.1.1
在我的项目中。我也尝试过最新的 spring boot 3.2.1,但它给出了同样的问题,该问题仅发生在 spring boot 版本 3.1.x 及更高版本中。

这是我的 spring kafka 属性

kafka:
  consumer:
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: io.confluent.kafka.serializers.KafkaJsonDeserializer
    group-id: Dev_Phoenix_ServiceRequestor_CG
  producer:
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
  properties:
    ssl.endpoint.identification.algorithm: https
    sasl.mechanism: PLAIN
    bootstrap.servers: xyz.eastus.azure.confluent.cloud:9092
    sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';
    security.protocol: SASL_SSL
    schema.registry.url: https://xyz.westus2.azure.confluent.cloud  
    basic.auth.credentials.source: USER_INFO
    schema.registry.basic.auth.user.info: M2T3WWJYIPFI5NBO:qSZikyV70b1h7tEHtmenmLTje7+aeUi6h+ZwRZ6scwLYUzzo9EhI70bk8OuWEH4Y
    #specific.avro.reader: true
    json.value.type: com.xxx.phoenix.events.DomainEvent

这是 kafkaAdmin 配置

@Configuration
public class DomainEventsKafkaTopicConfig {
    @Value("${spring.kafka.properties.bootstrap.servers}")
    private String bootstrapAddress;
    @Value("${spring.kafka.properties.sasl.jaas.config}")
    private String phoenixClusterJAASConfig;
    @Value("${spring.kafka.properties.sasl.mechanism:PLAIN}")
    private String phoenixClusterSASLMechanism;
    @Value("${spring.kafka.properties.security.protocol:SASL_SSL}")
    private String phoenixClusterSecurityProtocol;
    @Value("${phoenix.domain.event.topic.name}")
    private String domainEventTopic;
    @Value("${phoenix.domain.event.topic.partitions:3}")
    private int domainEventTopicPartitions;
    @Value("${phoenix.domain.event.topic.replicationFactor:3}")
    private int domainEventTopicReplicationFactor;

    public DomainEventsKafkaTopicConfig() {
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap();
        configs.put("bootstrap.servers", this.bootstrapAddress);
        configs.put("security.protocol", this.phoenixClusterSecurityProtocol);
        configs.put("sasl.mechanism", this.phoenixClusterSASLMechanism);
        configs.put("sasl.jaas.config", this.phoenixClusterJAASConfig);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic topic1() {
        return new NewTopic(this.domainEventTopic, this.domainEventTopicPartitions, (short)this.domainEventTopicReplicationFactor);
    }
}

我陷入困境,无法升级 Spring Boot 版本,任何帮助将不胜感激。

spring-boot spring-kafka
1个回答
0
投票

应该是这样的:

 spring:
   kafka:
     bootstrap-servers: xyz.eastus.azure.confluent.cloud:9092

从 Spring Boot

3.1.0
开始,现在有一个
ConnectionDetails
抽象,并且
bootstrap.servers
Kafka 属性从现在开始不再使用:

private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties,
        KafkaConnectionDetails connectionDetails) {
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getConsumerBootstrapServers());
    if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
        properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.