KafkaListener 反序列化到 Avro POJO 类时出现“Caused by: org.springframework.messaging.converter.MessageConversionException”

问题描述 投票:0回答:1

我正在使用 Spring Boot 构建一个 Kafka 监听器/消费者,它使用来自某个主题的 Avro 数据。

这是一些属性

spring.kafka.properties.specific.avro.reader=true
spring.kafka.consumer.properties.acks=all
spring.kafka.consumer.properties.auto.offset.reset=latest
spring.kafka.properties.schema.registry.url=http://10.0.99.111:8081
spring.kafka.properties.schema.registry.ssl.truststore.location=/sr.truststore.jks
spring.kafka.properties.schema.registry.ssl.truststore.password=password
spring.kafka.properties.auto.register.schemas=false

这是监听器函数:

@KafkaListener(topics = "TOPIC", groupId = "GROUP_ID"
            , properties = {
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ":org.apache.kafka.common.serialization.StringDeserializer",
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":io.confluent.kafka.serializers.KafkaAvroDeserializer"
            })
public void listen(Value message) throws Exception {
    logger.debug("Consumed from kafka {}", message.toString());
}

错误:

2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mykafkalistener.Value] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}], failedMessage=GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2044)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 10 common frames omitted
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [com.mykafkalistener.Value] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 13 common frames omitted

当我将函数更改为

listen(ConsumerRecord<String, Value> message)
时,错误更改为此并随后跳过此偏移量:

2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}], failedMessage=GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:87)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:52)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2044)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 10 common frames omitted
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [[B] to [org.apache.kafka.clients.consumer.ConsumerRecords] for GenericMessage [payload=byte[124], headers={kafka_offset=95, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@2b056e41, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=18, kafka_receivedTopic=TOPIC, kafka_receivedTimestamp=1714990522973, kafka_groupId=GROUP_ID}]
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329)
2024-05-06T17:15:23.232+07:00 [APP/PROC/WEB/0] [OUT] ... 13 common frames omitted

POJO 类是自动生成的:

<build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>${avro.version}</version>
                <executions>
                    <execution>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/avro</sourceDirectory>
                            <outputDirectory>${project.basedir}/src/main/java/entity/kafka</outputDirectory>
                            <stringType>String</stringType>
                            <enableDecimalLogicalType>true</enableDecimalLogicalType>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
        <finalName>${project.artifactId}</finalName>
    </build>

我已将 Specific Avro Reader 设置为 true,但它不起作用。

什么是

B
课程?为什么在转换之前将数据视为
B
?怎么变成
B
课了?我的代码中没有这样的类。

我尝试使用这个

Value
类进行生产和消费。生产很好,但消费不好。

出了什么问题?如何让监听器解析数据到那个pojo?我该怎么办?


更新:

我尝试过像这样添加ConsumerConfig

@EnableKafka
@Configuration
public class KafkaConsumerConfiguration {
    @Bean
    public ConsumerFactory<String, Object> consumerFactory() {
        KafkaProperties properties = new KafkaProperties();
        return new DefaultKafkaConsumerFactory<>(properties.buildConsumerProperties(), new StringDeserializer(), new KafkaAvroDeserializer());
    }

    /**
     * This is to deserialize kafka data that has the type:<br>
     * <ul>
     *     <li>Key = String</li>
     *     <li>Value = Avro</li>
     * </ul>
     * @return
     */
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerAvroFactory() {
        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

但是日志变得清晰,因为它没有收到任何内容。

当我将

KafkaProperties
初始化更改为
@AutoWired
时,这是我收到的错误:

2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition TOPIC-0 at offset 98. If needed, please seek past the record to continue consumption.
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] [07-05-2024 09:42:53.844] [DEBUG] [] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer] - Commit list: {}
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] [07-05-2024 09:42:53.844] [ERROR] [] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer] - Consumer exception
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:145)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:113)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1401)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
2024-05-07T09:42:53.844+07:00 [APP/PROC/WEB/0] [OUT] at java.util.concurrent.FutureTask.run(FutureTask.java:266)
java apache-kafka spring-kafka avro confluent-schema-registry
1个回答
0
投票

这可能并不能真正解决 POJO 的解析问题,但我决定采取另一条路线: 让监听器将其解析为

GenericRecord

@KafkaListener(topics = "TOPIC", groupId = "GROUP_ID"
            , properties = {
                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG + ":org.apache.kafka.common.serialization.StringDeserializer",
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG + ":io.confluent.kafka.serializers.KafkaAvroDeserializer"
            })
public void listen(GenericRecord message) throws Exception {
    logger.debug("Consumed from kafka {}", message.toString());
}

现在,它可以正常监听主题,但只需要额外的解决方法来从 GenericRecord 进行解析。

© www.soinside.com 2019 - 2024. All rights reserved.