Kafka Consumer没有在Spring Boot中收到消息

问题描述 投票:1回答:1

我的spring / java使用者无法访问生产者生成的消息。但是,当我从console / terminal运行使用者时,它能够接收spring / java producer生成的消息。

消费者配置:

@Component
@ConfigurationProperties(prefix="kafka.consumer")
public class KafkaConsumerProperties {

    private String bootstrap;
    private String group;
    private String topic;

    public String getBootstrap() {
        return bootstrap;
    }

    public void setBootstrap(String bootstrap) {
        this.bootstrap = bootstrap;
    }

    public String getGroup() {
        return group;
    }

    public void setGroup(String group) {
        this.group = group;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }
}

监听器配置:

@Configuration
@EnableKafka
public class KafkaListenerConfig {

    @Autowired
    private KafkaConsumerProperties kafkaConsumerProperties;

    @Bean
    public Map<String, Object> getConsumerProperties() {
        Map<String, Object> properties = new HashMap<>();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConsumerProperties.getBootstrap());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerProperties.getGroup());
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        return properties;
    }

    @Bean
    public Deserializer stringKeyDeserializer() {
        return new StringDeserializer();
    }

    @Bean
    public Deserializer transactionJsonValueDeserializer() {
        return new JsonDeserializer(Transaction.class);
    }

    @Bean
    public ConsumerFactory<String, Transaction> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(getConsumerProperties(), stringKeyDeserializer(), transactionJsonValueDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Transaction> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Transaction> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(1);
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

}

卡夫卡听众:

@Service
public class TransactionConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(Transaction.class);

    @KafkaListener(topics={"transactions"}, containerFactory = "kafkaListenerContainerFactory")
    public void onReceive(Transaction transaction) {
        LOGGER.info("transaction = {}",transaction);
    }
}

消费者应用:

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class, args);
    }

}

测试案例1:通过我开始我的spring / java生产者并从控制台运行消费者。当我生成消息表单生成器时,我的控制台消费者能够访问该消息。

测试案例2:失败我开始使用spring / java使用者并从控制台运行生产者。当我生成消息表单控制台生成器时,我的spring / java使用者无法访问该消息。

测试案例3:失败我开始使用spring / java使用者并运行spring / java生成器。当我生成消息表单spring / java producer时,我的spring / java使用者无法访问该消息。

  1. 我的消费者代码有什么问题吗?
  2. 我错过了我的kafka-listener的任何配置吗?
  3. 我是否需要显式运行监听器? (我不这么认为,因为我可以看到连接到主题的终端日志,但我不确定)
spring spring-boot apache-kafka kafka-consumer-api
1个回答
0
投票

好的,你在AUTO_OFFSET_RESET_CONFIG缺少Consumer Configs

properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

auto.offset.reset

当Kafka中没有初始偏移量或者服务器上不再存在当前偏移量时(例如因为该数据已被删除)该怎么办:

最早:自动将偏移重置为最早的偏移量

最新:自动将偏移重置为最新偏移

none:如果没有找到消费者组的先前偏移量,则向消费者抛出异常

其他:向消费者抛出异常

注意:auto.offset.resetearliest只有在kafka没有该消费者组的偏移时才会起作用(所以在你的情况下需要用新的消费者组添加这个属性并重新启动应用程序)

© www.soinside.com 2019 - 2024. All rights reserved.