org.springframework.context.ApplicationContextException:无法启动bean'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry

问题描述 投票:0回答:3

我正在开发 Spring Boot + Apache Kafka + Apache Zookeeper 示例。我已经在本地 Windows 计算机上安装/设置了 Apache Zookeeper 和 Apache Kafka。我从链接中获取了参考:https://www.tutorialspoint.com/spring_boot/spring_boot_apache_kafka.htm并按原样执行代码:

设置:https://medium.com/@shaaslam/installing-apache-kafka-on-windows-495f6f2fd3c8

错误:

org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.3.RELEASE.jar:5.1.3.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.1.RELEASE.jar:2.1.1.RELEASE]
    at com.tutorialspoint.SpringKafkaTutorialspointApplication.main(SpringKafkaTutorialspointApplication.java:21) [classes/:na]
Caused by: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

来自 Zookeeper 服务器的日志:

2018-12-14 22:16:35,352 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60197 (no session established for client)
2018-12-14 22:16:36,260 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60200
2018-12-14 22:16:36,260 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:36,262 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60200 (no session established for client)
2018-12-14 22:16:37,473 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60204
2018-12-14 22:16:37,473 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:37,476 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60204 (no session established for client)
2018-12-14 22:16:38,383 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60207
2018-12-14 22:16:38,384 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:38,388 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60207 (no session established for client)
2018-12-14 22:16:39,494 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60211
2018-12-14 22:16:39,494 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:39,497 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60211 (no session established for client)
2018-12-14 22:16:40,506 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60214
2018-12-14 22:16:40,507 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:40,509 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60214 (no session established for client)
2018-12-14 22:16:41,519 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0.0.1:60217
2018-12-14 22:16:41,519 [myid:] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@383] - Exception causing close of session 0x0: null
2018-12-14 22:16:41,525 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:60217 (no session established for client)

KafkaProducerConfig.java

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory(){
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaConsumerConfig.java

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2181");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

SpringKafkaTutorialspointApplication.java

@SpringBootApplication
public class SpringKafkaTutorialspointApplication implements CommandLineRunner{

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    
    public void sendMessage(String message) {
        kafkaTemplate.send("tutorialspoint", message);
    }
    
    public static void main(String[] args) {
        SpringApplication.run(SpringKafkaTutorialspointApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        sendMessage("Hi Welcome to Spring For Apache Kafka @@@@@@@@@");
    }
    
    @KafkaListener(topics = "tutorialspoint", groupId = "group-id")
    public void listen(String message) {
        System.out.println("Received Messasge in group - group-id: " + message);
    }
}

接下来我可以尝试什么来解决这个问题?

spring spring-boot apache-kafka apache-zookeeper
3个回答
6
投票

花了很多时间后,我发现我也应该在

KafkaConsumerConfig
中使用下面的内容。

通过以下更改,代码可以正常工作。

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

3
投票

该问题可能是由于Spring boot版本和Kafka版本不匹配导致的。 我能够用以下矩阵来解决。

春季启动版本:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

Spring boot版本的Kafka版本maven依赖

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency> 

卡夫卡版本:kafka_2.12-2.2.0
Zookeeper版本:apache-zookeeper-3.5.5

Application.yml 文件详细信息如下:

server:
  port: 9000
spring:
  kafka:
    consumer:
      bootstrap-servers:
      - localhost:9092
      group-id: group_id
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      bootstrap-servers:
      - localhost:9092
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer 

0
投票

我已经编写了kafkalistener,并且在给出开始的主题名称后,传递的主题名称为空或为空

© www.soinside.com 2019 - 2024. All rights reserved.