带有 spring-kafka 的 Kafka 死信队列 (DLQ)

问题描述 投票:0回答:1

在 Spring Boot 2.0 应用程序中使用 spring-kafka 2.1.x 实现死信队列 (DLQ) 概念的最佳方法是什么,以将某些 bean 的 @KafkaListener 方法处理失败的所有消息发送到一些预定义的 Kafka DLQ 主题并且不会丢失单个消息?

因此消耗的 Kafka 记录是:

  1. 处理成功,
  2. 处理失败,发送至DLQ主题,
  3. 处理失败,没有发送到DLQ主题(由于意外问题),因此将再次被监听器消费。

我尝试使用 ErrorHandler 的自定义实现创建侦听器容器,使用 KafkaTemplate 发送记录无法处理到 DLQ 主题。使用禁用的自动提交和RECORDAckMode。

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

看来这个实现并不能保证项目#3。如果 DlqErrorHandler 中抛出异常,记录将不会再次被监听器消耗。

使用事务侦听器容器有帮助吗?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

有没有什么方便的方法来使用 Spring Kafka 实现 DLQ 概念?

更新2018/03/28

感谢 Gary Russell 的回答,我能够通过实现 DlqErrorHandler 来实现所需的行为,如下所示

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            // Other records may be from other partitions, so seek to current offset for other partitions too
            // ...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

这样如果消费者轮询返回 3 条记录 (1, 2, 3) 并且第二条记录无法处理:

  • 1 将被处理
  • 2 将无法处理并发送至 DLQ
  • 3 感谢消费者寻求 record.offset() + 1,它将被传递给监听者

如果发送到 DLQ 失败,消费者会寻求 record.offset() 并且记录将被重新传递给监听器(并且发送到 DLQ 可能会被淘汰)。

更新2021/04/30

从 Spring Kafka 2.7.0 开始原生支持非阻塞重试和死信主题。

参见示例:

https://github.com/evgeniy-khist/spring-kafka-non-blocking-retries-and-dlt

重试通常应该是非阻塞的(在单独的主题中完成)并且延迟:

    不干扰实时交通;
  • 不增加呼叫数量,实质上是垃圾邮件发送不良请求;
  • 为了可观察性(获取重试次数和其他元数据)。 使用 Kafka 实现非阻塞重试和 DLT 功能通常需要设置额外的主题并创建和配置相应的侦听器。
java spring-boot apache-kafka spring-kafka dead-letter
1个回答
9
投票
参见

SeekToCurrentErrorHandler


当发生异常时,它会寻找消费者,以便在下一次轮询时重新传送所有未处理的记录。

您可以使用相同的技术(例如子类)写入 DLQ,如果 DLQ 写入失败,则查找当前偏移量(以及其他未处理的偏移量);如果 DLQ 写入成功,则仅查找剩余记录。

编辑

DeadLetterPublishingRecoverer

是在这个答案发布几个月后添加的。

https://docs.spring.io/spring-kafka/api/org/springframework/kafka/listener/DeadLetterPublishingRecoverer.html

© www.soinside.com 2019 - 2024. All rights reserved.