我用 Spring Cloud Streaming 做了一些实验,其中 kafka 是数据源。 需要配置
dlq
,而我的配置不支持它。
所以我打开了开源示例,它确实有效。
spring.cloud.stream.kafka.streams.binder:
configuration:
commit.interval.ms: 1000
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
application.id: dlq-demo-sample
serdeError: sendToDlq
spring.cloud.stream.bindings.process-out-0:
destination: counts
spring.cloud.stream.bindings.process-in-0:
destination: words
spring.cloud.stream.kafka.streams.bindings.process-in-0.consumer:
dlqName: words-count-dlq
valueSerde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
示例使用
<spring-cloud.version>2020.0.2</spring-cloud.version>
和
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.4</version>
<relativePath/>
</parent>
我将此库升级到新版本:
<spring-cloud.version>2023.0.0</spring-cloud.version>
和
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.2.2</version>
<relativePath/>
</parent>
更新版本后,通常的场景按预期工作(如果 serde 值为
StringSerde
),但 DLQ 配置被忽略(使用原始 yml
)。
您认为我将示例更新为不兼容的库吗?
...以下 spring 文档 它应该兼容:
|Release Train | Spring Boot Generation |
|---------------------------------------------|
|2023.0.x aka Leyton | 3.2.x |
...
您认为我错过了一些重要的属性以使其与较新的 Spring 版本一起使用吗?
serde-error
) 中的
serdeError: sendToDlq
已被弃用一段时间,并从 4.0.x
版本的活页夹中删除。这有利于deserialization-exception-handler
。有关详细信息,请参阅参考文档:https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-streams-binder/error-handling.html
您可以使用该新属性并重试吗?我们将更新您上面引用的示例以反映新配置。