出现错误无法构建kafka消费者

问题描述 投票:0回答:1

由以下原因引起:org.apache.common.config.ConfigException:bootstrap.server 中没有给出可解析的引导 URL

当我在配置中添加此内容时:

@Configuration
class TopicConfiguration {

private ConsumarFactory<String, String> creteConsumer(String id) {
Map<String, String> config = new hashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVER_CONFIG, configProperties.getBootstrapServers);

当我添加它时

config.put(ConsumerConfig.BOOTSTRAP_SERVER_CONFIG, "localhost://8080"));

它正在工作,但我必须从属性文件中获取引导服务器

java spring-boot apache-kafka spring-kafka kafka-consumer-api
1个回答
0
投票

有 kafkaConfig 类可以使用。

我使用了一个名为 KafkaTransferDto 的自定义对象


public class KafkaTransferDto {

    private Object object;

    private String topic;


    public Object getObject() {
        return object;
    }

    public void setObject(Object object) {
        this.object = object;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

  
}
import com.pns.dto.KafkaTransferDto;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.*;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerializer;
import java.util.HashMap;
import java.util.Map;


@Configuration
public class KafkaConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;
    private final String groupId = "PNS";


    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaTransferDto> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaTransferDto> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }


    public ConsumerFactory<String, KafkaTransferDto> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        JsonDeserializer<KafkaTransferDto> deserializer = new JsonDeserializer<>(KafkaTransferDto.class, false);
        deserializer.addTrustedPackages("*");
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), deserializer);
    }

    @Bean
    public ProducerFactory<String, KafkaTransferDto> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);

        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public KafkaTemplate<String, KafkaTransferDto> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

这是我的应用程序.yml


spring:
  datasource:
    url: jdbc:h2:mem:mydb
    username: sa
    password: password
    driverClassName: org.h2.Driver
  jpa:
    database-platform: org.hibernate.dialect.H2Dialect
  kafka:
    bootstrap-servers: localhost:9092

server:
  port: 8051
© www.soinside.com 2019 - 2024. All rights reserved.