Spring Cloud Stream 使用 Kafka DLT 处理毒丸

问题描述 投票:0回答:1

我按照这个食谱来处理反序列化错误:https://github.com/spring-cloud/spring-cloud-stream-samples/blob/main/recipes/recipe-3-handling-deserialization-errors-dlq -kafka.adoc

我将上面食谱中提到的豆子创建为:

Configuration
@Slf4j
public class ErrorHandlingConfig {

    @Bean
    public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(SeekToCurrentErrorHandler errorHandler) {
        return (container, dest, group) -> {
            container.setErrorHandler(errorHandler);
        };
    }

    @Bean
    public SeekToCurrentErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
        return new SeekToCurrentErrorHandler(deadLetterPublishingRecoverer);
    }

    @Bean
    public DeadLetterPublishingRecoverer publisher(KafkaOperations bytesTemplate) {
        return new DeadLetterPublishingRecoverer(bytesTemplate);
    }
}

配置文件:

spring:
  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
        consumer:
          useNativeDecoding: true
      bindings:
        myInboundRoute:
          destination: some-destination.1
          group: a-custom-group
        myOutboundRoute:
          destination: some-destination.2
      kafka:
        binder:
          brokers: localhost
          defaultBrokerPort: 9092
          configuration:
            application:
              security: PLAINTEXT
        bindings:
          myInboundRoute:
            consumer:
              autoCommitOffset: true
              startOffset: latest
              enableDlq: true
              dlqName: my-dql.poison
              dlqProducerProperties:
                configuration:
                  value.serializer: myapp.serde.MyCustomSerializer
              configuration:
                  value.deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
                  spring.deserializer.value.delegate.class: myapp.serde.MyCustomSerializer
          myOutboundRoute:
            producer:
              configuration:
                key.serializer: org.apache.kafka.common.serialization.StringSerializer
                value.serializer: myapp.serde.MyCustomSerializer

我原以为 DLT 会被称为

my-dql.poison
。这个主题实际上创建得很好,但是我还自动创建了第二个主题,名为
some-destination.1.DLT
为什么它会创建这个以及我在配置中用
dlqName
命名的那个?

我做错了什么?当我轮询消息时,消息位于自动创建的

some-destination.1.DLT
中,而不是我的 dlqName

spring-kafka spring-cloud-stream spring-cloud-stream-binder-kafka kafka-dlt
1个回答
0
投票
  1. 如果在容器中配置 STCEH,则不应在绑定中配置 dlt 处理。还设置

    maxAttempts=1
    以禁用重试。

  2. 您需要在 DLPR 中配置目标解析器以使用不同的名称。

    /**
     * Create an instance with the provided template and destination resolving function,
     * that receives the failed consumer record and the exception and returns a
     * {@link TopicPartition}. If the partition in the {@link TopicPartition} is less than
     * 0, no partition is set when publishing to the topic.
     * @param template the {@link KafkaOperations} to use for publishing.
     * @param destinationResolver the resolving function.
     */
    public DeadLetterPublishingRecoverer(KafkaOperations<? extends Object, ? extends Object> template,
            BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> destinationResolver) {
        this(Collections.singletonMap(Object.class, template), destinationResolver);
    }

参见https://docs.spring.io/spring-kafka/docs/current/reference/html/#dead-letters

使用绑定的 DLT 名称配置 DLPR 存在一个未解决的问题。

https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/1031

© www.soinside.com 2019 - 2024. All rights reserved.