即使组 id 和客户端 id 设置正确,Kafka Consumer 也没有收到来自 Topic 的任何消息

问题描述 投票:0回答:1

使用 @InputChannelAdaptor ,我正在从主题轮询消息,但如果我从命令行发布 json,则不会收到任何消息。但如果我传递任何文本,它会引发异常。我正在尝试使用 json 对象 ({"name" : "foo"}) 并将其转换为 CreateResponse 类

向主题发送消息的命令:

C:\kafka_2.13-3.5.1\bin\windows>kafka-console-producer.bat --broker-list localhost:9092 --topic myTopic
>{"name" : "foo"}
>sdwadaw

代码:

package com.cae.egca.connector.config;



@Configuration
@EnableKafka
@Slf4j
public class KafkaConfig {



    @Bean
    RecordMessageConverter messageConverter() {
        return new StringJsonMessageConverter();
    }


    @Bean
    @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
    public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
                                                         RecordMessageConverter messageConverter) {

        KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
                new ConsumerProperties("myTopic"));
        kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
        kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
        kafkaMessageSource.setMessageConverter(messageConverter);
        kafkaMessageSource.setPayloadType(CreateResponse.class);

        return kafkaMessageSource;
    }


    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10000");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupID");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");

        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);


        return new DefaultKafkaConsumerFactory(props);
    }

 @ServiceActivator(inputChannel = "inputChannel")
    void consumeIt(@Payload SftpOutboundFilesDetails cr, @Header(KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment acknowledgment) throws JSchException, URISyntaxException, IOException {
        log.info("In SERVICE ACTIVATOR ");
        MDC.put("transaction.id", String.valueOf(UUID.randomUUID()));
        fileServices.processFile(cr);
        acknowledgment.acknowledge();
        log.info("ACKNOWLEDGED: ");
    }
    @Bean
    QueueChannel inputChannel() {
        return new QueueChannel();
    }

}

例外:

Caused by: org.springframework.kafka.support.converter.ConversionException: Failed to convert from JSON
    at org.springframework.kafka.support.converter.JsonMessageConverter.extractAndConvertValue(JsonMessageConvertework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:412)
    ... 14 more
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'sdwadaw': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (String)"sdwadaw"; line: 1, column: 8]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:2477)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:760)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._reportInvalidToken(ReaderBasedJsonParser.java:3041)
    at  ... 21 more```



spring-boot kafka-consumer-api spring-kafka
1个回答
0
投票

这对我来说效果很好...

@SpringBootApplication
public class So76876015Application {

    public static void main(String[] args) {
        SpringApplication.run(So76876015Application.class, args);
    }

    @Bean
    RecordMessageConverter messageConverter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    @InboundChannelAdapter(channel = "inputChannel", poller = @Poller(fixedDelay = "5000"))
    KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
            RecordMessageConverter messageConverter) {

        KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
                new ConsumerProperties("myTopic"));
        kafkaMessageSource.getConsumerProperties().setGroupId("myGroupId");
        kafkaMessageSource.getConsumerProperties().setClientId("myClientId");
        kafkaMessageSource.setMessageConverter(messageConverter);
        kafkaMessageSource.setPayloadType(CreateResponse.class);

        return kafkaMessageSource;
    }

    @ServiceActivator(inputChannel = "inputChannel")
    void consumeIt(CreateResponse cr) {
        System.out.println(cr);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("myTopic").partitions(1).replicas(1).build();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            template.send("myTopic", "{\"name\":\"foo\"}");
        };
    }

    public static class CreateResponse {

        private String name;

        protected String getName() {
            return this.name;
        }

        protected void setName(String name) {
            this.name = name;
        }

        @Override
        public String toString() {
            return "CreateResponse [name=" + this.name + "]";
        }

    }

}
spring.kafka.consumer.auto-offset-reset=earliest
CreateResponse [name=foo]
© www.soinside.com 2019 - 2024. All rights reserved.