我使用
spring-cloud-stream-binder-kafka
和 spring-cloud-stream
以功能方式配置 kafka 流。
我的云依赖项来自
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>2023.0.0</version>
<type>pom</type>
<scope>import</scope>
</dependency>
主要依赖项是
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
</dependency>
我的yml配置:
kafka-conf:
# server: localhost:9101
server: localhost:9092
group: test-streams
consume-topic-report-details: report_details_topic
report-details-dlq: report_details_dlq
produce-topic-report-details: report_details_topic_redirect
schema: http://localhost:8081
spring:
application:
name: ${kafka-conf.group}-streaming
cloud:
function:
definition: reportDetails
stream:
bindings:
reportDetails-in-0:
contentType: application/*+avro
destination: ${kafka-conf.consume-topic-report-details}
group: ${kafka-conf.group}-streaming
reportDetails-out-0:
contentType: application/*+avro
destination: ${kafka-conf.produce-topic-report-details}
kafka:
streams:
binder:
deserialization-exception-handler: sendToDlq
configuration:
commit.interval.ms: 100
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
bindings:
reportDetails-in-0:
consumer:
dlqName: ${kafka-conf.report-details-dlq}
binder:
brokers: ${kafka-conf.server}
schemaRegistryClient:
endpoint: ${kafka-conf.schema}
使用
io.confluent.kafka.serializers.KafkaAvroSerializer
序列化的消费消息。
我希望我的服务自动对流中的每条消息使用 io.confluent.kafka.serializers.KafkaAvroDeserializer
,但我的 yml
配置中似乎忽略了某些内容。
结果我的蒸汽失败了
@Bean
Function<ReportDetails, ReportDetails> reportDetails() {
return data -> {
log.info("input reportDetails: {}", data);
return data;
};
}
有例外
Caused by: java.lang.ClassCastException: class [B cannot be cast to class com.vl.model.avro.ReportDetails ([B is in module java.base of loader 'bootstrap'; com.vl.model.avro.ReportDetails is in unnamed module of loader 'app')
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.invokeFunctionAndEnrichResultIfNecessary(SimpleFunctionRegistry.java:958)
另一方面,我尝试直接反序列化它(它有效)。
@Bean
Function<byte[], byte[]> filterAbsence() {
return dto -> {
SchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient("http://schema-server:8081", 5);
try (KafkaAvroDeserializer deserializer = new KafkaAvroDeserializer(schemaRegistryClient)) {
AbsenceDto obj = deserializer.deserialize("report_details_topic", dto);
log.info("received message: {}", obj);
}
log.info("received message: {}", dto);
return dto;
};
}
我怀疑我的 DQL 配置也不正确。 当我的消费者失败时,我希望消息将被重定向到 DLQ,但其中一个是空的。
yml
配置以使反序列化自动工作?您检查过应用程序日志吗?
StreamsConfig
已在启动时登录,因此您可以验证您的配置是否已被拾取,看起来没有。
我不是 spring 人,也不是 yml 专家,但你为什么使用:
default:
key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
这些配置被称为
default.key.serde
和 default.value.serde
所以不应该是
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
类似于你设置的
commit.interal.ms
?您可以验证它是否检测到提交间隔更改吗?