Spring Cloud Stream不会针对指定配置自动反序列化kafka消息

问题描述 投票:0回答:1

我使用

spring-cloud-stream-binder-kafka
spring-cloud-stream
以功能方式配置 kafka 流。 我的云依赖项来自

            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>2023.0.0</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>

主要依赖项是

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>

我的yml配置:

kafka-conf:
#  server: localhost:9101
  server: localhost:9092
  group: test-streams
  consume-topic-report-details: report_details_topic
  report-details-dlq: report_details_dlq
  produce-topic-report-details: report_details_topic_redirect
  schema: http://localhost:8081

spring:
  application:
    name: ${kafka-conf.group}-streaming
  cloud:
    function:
      definition: reportDetails
    stream:
      bindings:
        reportDetails-in-0:
          contentType: application/*+avro
          destination: ${kafka-conf.consume-topic-report-details}
          group: ${kafka-conf.group}-streaming
        reportDetails-out-0:
          contentType: application/*+avro
          destination: ${kafka-conf.produce-topic-report-details}
      kafka:
        streams:
          binder:
            deserialization-exception-handler: sendToDlq
            configuration:
              commit.interval.ms: 100
              default:
                key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
                value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
          bindings:
            reportDetails-in-0:
              consumer:
                dlqName: ${kafka-conf.report-details-dlq}
        binder:
          brokers: ${kafka-conf.server}
          schemaRegistryClient:
            endpoint: ${kafka-conf.schema}

问题详情

使用

io.confluent.kafka.serializers.KafkaAvroSerializer
序列化的消费消息。 我希望我的服务自动对流中的每条消息使用
io.confluent.kafka.serializers.KafkaAvroDeserializer
,但我的
yml
配置中似乎忽略了某些内容。

结果我的蒸汽失败了

    @Bean
    Function<ReportDetails, ReportDetails> reportDetails() {
        return data -> {
            log.info("input reportDetails: {}", data);
            return data;
        };
    }

有例外

Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.vl.model.avro.ReportDetails ([B is in module java.base of loader 'bootstrap'; com.vl.model.avro.ReportDetails is in unnamed module of loader 'app')
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958)

另一方面,我尝试直接反序列化它(它有效)。

    @Bean
    Function<byte[], byte[]> filterAbsence() {
        return dto -> {
            SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-server:8081", 5);
            try (KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient)) {
                AbsenceDto obj = deserializer.deserialize("report_details_topic", dto);
                log.info("received message: {}", obj);
            }
            log.info("received message: {}", dto);
            return dto;
        };
    }

我怀疑我的 DQL 配置也不正确。 当我的消费者失败时,我希望消息将被重定向到 DLQ,但其中一个是空的。

问题:

  1. 如何修复我的
    yml
    配置以使反序列化自动工作?
  2. 如何修复我的 DQL 配置或获得确认(如果配置正确)?
java spring-boot apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

您检查过应用程序日志吗?

StreamsConfig
已在启动时登录,因此您可以验证您的配置是否已被拾取,看起来没有。

我不是 spring 人,也不是 yml 专家,但你为什么使用:

default:
    key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
    value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

这些配置被称为

default.key.serde
default.value.serde
所以不应该是

default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde

类似于你设置的

commit.interal.ms
?您可以验证它是否检测到提交间隔更改吗?

© www.soinside.com 2019 - 2024. All rights reserved.