我最近将我的一项 Spring Boot 服务升级到 3.1.x,升级后我遇到了 kafka 问题。它似乎无法连接并不断给我以下日志。
2024-01-03T06:18:46.313456838Z 2024-01-03T06:18:46.313Z WARN 1 --- [servicerequestorms] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-Dev_Phoenix_ServiceRequestor_CG-1, groupId=Dev_Phoenix_ServiceRequestor_CG] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected
Wed, Jan 3 2024 11:48:47 am
2024-01-03T06:18:47.403Z INFO 1 --- [servicerequestorms] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-Dev_Phoenix_ServiceRequestor_CG-1, groupId=Dev_Phoenix_ServiceRequestor_CG] Node -1 disconnected. 2024-01-03T06:18:47.403489607Z 2024-01-03T06:18:47.403Z WARN 1 --- [servicerequestorms] [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-Dev_Phoenix_ServiceRequestor_CG-1, groupId=Dev_Phoenix_ServiceRequestor_CG] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
但是当我切换回 Spring Boot 3.0.x 时,它工作正常。我在用着
spring-kafka 3.1.1
在我的项目中。我也尝试过最新的 spring boot 3.2.1,但它给出了同样的问题,该问题仅发生在 spring boot 版本 3.1.x 及更高版本中。
这是我的 spring kafka 属性
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaJsonDeserializer
group-id: Dev_Phoenix_ServiceRequestor_CG
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
ssl.endpoint.identification.algorithm: https
sasl.mechanism: PLAIN
bootstrap.servers: xyz.eastus.azure.confluent.cloud:9092
sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username='username' password='password';
security.protocol: SASL_SSL
schema.registry.url: https://xyz.westus2.azure.confluent.cloud
basic.auth.credentials.source: USER_INFO
schema.registry.basic.auth.user.info: M2T3WWJYIPFI5NBO:qSZikyV70b1h7tEHtmenmLTje7+aeUi6h+ZwRZ6scwLYUzzo9EhI70bk8OuWEH4Y
#specific.avro.reader: true
json.value.type: com.xxx.phoenix.events.DomainEvent
这是 kafkaAdmin 配置
@Configuration
public class DomainEventsKafkaTopicConfig {
@Value("${spring.kafka.properties.bootstrap.servers}")
private String bootstrapAddress;
@Value("${spring.kafka.properties.sasl.jaas.config}")
private String phoenixClusterJAASConfig;
@Value("${spring.kafka.properties.sasl.mechanism:PLAIN}")
private String phoenixClusterSASLMechanism;
@Value("${spring.kafka.properties.security.protocol:SASL_SSL}")
private String phoenixClusterSecurityProtocol;
@Value("${phoenix.domain.event.topic.name}")
private String domainEventTopic;
@Value("${phoenix.domain.event.topic.partitions:3}")
private int domainEventTopicPartitions;
@Value("${phoenix.domain.event.topic.replicationFactor:3}")
private int domainEventTopicReplicationFactor;
public DomainEventsKafkaTopicConfig() {
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap();
configs.put("bootstrap.servers", this.bootstrapAddress);
configs.put("security.protocol", this.phoenixClusterSecurityProtocol);
configs.put("sasl.mechanism", this.phoenixClusterSASLMechanism);
configs.put("sasl.jaas.config", this.phoenixClusterJAASConfig);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return new NewTopic(this.domainEventTopic, this.domainEventTopicPartitions, (short)this.domainEventTopicReplicationFactor);
}
}
我陷入困境,无法升级 Spring Boot 版本,任何帮助将不胜感激。
应该是这样的:
spring:
kafka:
bootstrap-servers: xyz.eastus.azure.confluent.cloud:9092
从 Spring Boot
3.1.0
开始,现在有一个 ConnectionDetails
抽象,并且 bootstrap.servers
Kafka 属性从现在开始不再使用:
private void applyKafkaConnectionDetailsForConsumer(Map<String, Object> properties,
KafkaConnectionDetails connectionDetails) {
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, connectionDetails.getConsumerBootstrapServers());
if (!(connectionDetails instanceof PropertiesKafkaConnectionDetails)) {
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
}
}