Kafka 消息发送到单个主题而不是 n 重试主题和 dlt

问题描述 投票:0回答:1

我正在尝试使用 DLT 实现 n-retry 主题,但所有消息都被推送到单个主题 test-topic-retry-0,test-topic-retry-0 中有 3 个重复记录,应该类似于这个:

  • test-topic-retry-0 -> 失败后 1 条消息
  • test-topic-retry-1 -> 第一次重试后有 1 条消息
  • test-topic-dlt -> 所有重试失败后出现 1 条消息

Kafka 似乎将所有消息推送到同一个主题。

Kafka配置:

@Bean
  public ConcurrentKafkaListenerContainerFactory<String, Object>
  kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, Object> factory =
        new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
    factory.setBatchListener(false);
    factory.setConcurrency(1);
    factory.getContainerProperties().setAckMode(AckMode.RECORD);

    return factory;
  }
private Map<String, Object> consumerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
  props.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ENABLE_AUTO_COMMIT_CONFIG, false);
  props.put(AUTO_OFFSET_RESET_CONFIG, "earliest");
  props.put(MAX_POLL_RECORDS_CONFIG, 100);
  props.put(HEARTBEAT_INTERVAL_MS_CONFIG, 2000);
  props.put(SESSION_TIMEOUT_MS_CONFIG, 10000);
  return props;
}
@Bean
public KafkaAdmin kadmin() {
  Map<String, Object> configs = new HashMap<>();
  configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
    return new KafkaAdmin(configs);
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
  Map<String, Object> configProps = new HashMap<>();
  configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootStrapAddress);
  configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

  return new DefaultKafkaProducerFactory<>(configProps);
}

@Bean
public KafkaTemplate<String, Object> prodTemplate() {
  return new KafkaTemplate<>(producerFactoryString());
}

@Bean
public RetryTopicConfiguration retryTopicConfig(KafkaTemplate<String, Object> template,
    ConcurrentKafkaListenerContainerFactory<String, Object> factory) {
      template.getProducerFactory().getConfigurationProperties());

  return RetryTopicConfigurationBuilder
      .newInstance()
      .exponentialBackoff(2000, 5, Long.MAX_VALUE)
      .maxAttempts(3)
      .timeoutAfter(-1)
      .autoCreateTopicsWith(3, (short) 3)
      .dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
      .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
      .retryTopicSuffix("-retry")
      .dltSuffix("-dlt")
      .listenerFactory(factory)
      .create(template);
}

听众:

@KafkaListener(topics=“test-topic”)
public void onMessage(ConsumerRecord<String, String> r) {
    throw new RuntimeException(“test”);
}

@DltHandler
public void handleDlt(ConsumerRecord<String, String> r) {
    log.error(“test dlt”);
}
apache-kafka kafka-consumer-api spring-kafka dead-letter kafka-dlt
1个回答
0
投票

我刚刚几乎准确地复制了您的代码,它对我来说按预期工作。

但是,我必须添加

.dltHandlerMethod

return RetryTopicConfigurationBuilder
        .newInstance()
        .exponentialBackoff(2000, 5, Long.MAX_VALUE)
        .maxAttempts(3)
        .timeoutAfter(-1)
        .autoCreateTopicsWith(3, (short) 1)
        .dltProcessingFailureStrategy(DltStrategy.FAIL_ON_ERROR)
        .setTopicSuffixingStrategy(TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
        .retryTopicSuffix("-retry")
        .dltSuffix("-dlt")
        .listenerFactory(factory)
        .dltHandlerMethod(Listener.class, "handleDlt")
        .create(template);
@Component
class Listener {

    private static final Logger log = LoggerFactory.getLogger(Listener.class);

    @KafkaListener(id = "so69453282", topics = "test-topic")
    public void onMessage(ConsumerRecord<String, String> r) {
        log.info(r.topic());
        throw new RuntimeException("test");
    }

    @DltHandler
    public void handleDlt(ConsumerRecord<String, String> r) {
        log.error("test dlt");
    }

}
2021-10-05 13:03:12,451 [so69453282-0-C-1] test-topic
2021-10-05 13:03:14,485 [so69453282-retry-0-0-C-1] test-topic-retry-0
2021-10-05 13:03:24,523 [so69453282-retry-1-0-C-1] test-topic-retry-1
2021-10-05 13:03:25,031 [so69453282-dlt-0-C-1] test dlt
© www.soinside.com 2019 - 2024. All rights reserved.