如何配置2个Spring Kafka监听器监听同一个主题以拥有自己的重试和死信主题

问题描述 投票:0回答:2

在 Spring Kafka 中,我有两个 Kafka 监听器收听同一主题。我希望两个侦听器都有自己的重试和死信主题,这样如果一个侦听器发生故障,它就不会重播给另一个侦听器。通过以下配置,他们最终都会使用相同的重试和死信主题

  • 支付主题
  • 付款主题重试
  • 支付主题-dlt
class PaymentListener {
    
    @KafkaListener(topics = ["payment-topic"],)
    fun sendBill(payment: Payment) {
        // ...some logic
    }

    @KafkaListener(topics = ["payment-topic"],)
    fun adjustBalance(payment: Payment) {
        // ...some logic
    }
}

    @Bean
    fun paymentTopicRetryStrategy(
        template: KafkaTemplate<String, ClearingFinishedEvent>,
    ): RetryTopicConfiguration {
        return RetryTopicConfigurationBuilder
            .newInstance()
            .useSingleTopicForSameIntervals()
            .fixedBackOff(3000)
            .maxAttempts(3)
            .includeTopic("payment-topic")
            .dltProcessingFailureStrategy(DltStrategy.ALWAYS_RETRY_ON_ERROR)
            .autoCreateTopics(true, 1, BROKER_DEFAULT)
            .create(template)
    }

我真正想要的是:

  • 支付主题

[sendBill 侦听器使用这些作为重试和死信主题]

  • 发送账单支付主题重试
  • 发送账单支付主题-dlt

[adjustBalance 侦听器使用这些作为重试和死信主题]

  • 调整余额支付主题重试
  • 调整余额支付主题-dlt

我可以编写一个自定义命名策略(https://docs.spring.io/spring-kafka/docs/current/reference/html/#custom-naming-strategies),但侦听器的详细信息不可用我所以无法说得足够具体。

有什么想法吗?

apache-kafka spring-kafka
2个回答
0
投票

不要使用

RetryTopicConfigurationBuilder
(按主题名称过滤),而是在侦听器上使用
@RetryableTopic
;然后您可以使用自定义的
retryTopicSuffix

然后您可以拥有类似

payment-topic-bill-retry
payment-topic-adjust-balance-retry
等的东西


0
投票

我尝试对多个侦听器使用 retryTopicSuffix,但只有最后一个“pn-retry”被调用。有什么建议吗?

springBoot版本=2.7.12 springKafka版本=2.9.10

package com.dmodi;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
@EnableKafkaRetryTopic
@Slf4j
public class KafkaRetry {
    public static final String TOPIC = "dmoditest";

    public static void main(String[] args) {
        SpringApplication.run(KafkaRetry.class, args);
    }

    @Bean
    public NewTopic mainTopic() {
        return TopicBuilder
                .name(TOPIC)
                .partitions(10)
                .replicas(1)
                .build();
    }

    @RetryableTopic(
            attempts = "3",
            backoff = @Backoff(delayExpression = "5000"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
            autoCreateTopics = "true",
            numPartitions = "5",
            replicationFactor = "1",
            retryTopicSuffix = "-email-retry",
            dltTopicSuffix = "-email-dlt"
    )
    @KafkaListener(topics = TOPIC, concurrency = "3", groupId = "mygroup")
    public void nonBlockingRetriesListen(ConsumerRecord<String, String> record) {
        log.info("Received record ----- : {} {}", record.value(), record.topic());
        throw new RuntimeException("retry");
    }

    @DltHandler
    public void dltHandler(ConsumerRecord<String, String> record) {
        log.info("Received record from DLT: {}", record.value());
    }

    @RetryableTopic(attempts = "3",
            backoff = @Backoff(delayExpression = "5000"),
            fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
            autoCreateTopics = "true",
            numPartitions = "5",
            replicationFactor = "1",
            retryTopicSuffix = "-pn-retry",
            dltTopicSuffix = "-pn-dlt"
    )
    @KafkaListener(topics = TOPIC, concurrency = "3", groupId = "mygroup2")
    public void nonBlockingRetriesListen2(ConsumerRecord<String, String> record) {
        log.info("Received record **** : {} {}", record.value(), record.topic());
        throw new RuntimeException("retry");
    }

}

日志:

2023-12-14 13:01:34.510  INFO 18191 --- [ntainer#0-0-C-1] com.dmodi.KafkaRetry                     : Received record ----- : dmodi5 dmoditest
2023-12-14 13:01:34.510  INFO 18191 --- [ntainer#4-0-C-1] com.dmodi.KafkaRetry                     : Received record **** : dmodi5 dmoditest
2023-12-14 13:01:35.022  INFO 18191 --- [ntainer#4-0-C-1] o.a.k.clients.producer.KafkaProducer     : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-12-14 13:01:35.026  INFO 18191 --- [ntainer#4-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 3.1.2
2023-12-14 13:01:35.026  INFO 18191 --- [ntainer#4-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: f8c67dc3ae0a3265
2023-12-14 13:01:35.026  INFO 18191 --- [ntainer#4-0-C-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1702587695026
2023-12-14 13:01:35.030  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Cluster ID: MZRjv5tgRIaOC3zYNz8L_g
2023-12-14 13:01:35.030  INFO 18191 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-1] ProducerId set to 2012 with epoch 0
2023-12-14 13:01:35.032  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-0 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-1 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-4 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-2 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-3 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.041  INFO 18191 --- [n-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-mygroup2-pn-retry-0-1, groupId=mygroup2-pn-retry-0] Seeking to offset 0 for partition dmoditest-pn-retry-0-0
2023-12-14 13:01:39.637  INFO 18191 --- [n-retry-0-0-C-1] com.dmodi.KafkaRetry                     : Received record **** : dmodi5 dmoditest-pn-retry-0
2023-12-14 13:01:40.149  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-0 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-1 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-4 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-2 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-3 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.167  INFO 18191 --- [n-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-mygroup2-pn-retry-0-1, groupId=mygroup2-pn-retry-0] Seeking to offset 1 for partition dmoditest-pn-retry-0-0
2023-12-14 13:01:40.168  INFO 18191 --- [n-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-mygroup2-pn-retry-1-4, groupId=mygroup2-pn-retry-1] Seeking to offset 0 for partition dmoditest-pn-retry-1-0
2023-12-14 13:01:40.171  INFO 18191 --- [n-retry-0-0-C-1] com.dmodi.KafkaRetry                     : Received record **** : dmodi5 dmoditest-pn-retry-0
2023-12-14 13:01:44.726  INFO 18191 --- [n-retry-1-0-C-1] com.dmodi.KafkaRetry                     : Received record **** : dmodi5 dmoditest-pn-retry-1
2023-12-14 13:01:44.743 ERROR 18191 --- [n-retry-1-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = dmoditest-pn-retry-1, partition = 0, offset = 0, main topic = dmoditest threw an error at topic dmoditest-pn-retry-1 and won't be retried. Sending to DLT with name dmoditest-pn-dlt.
2023-12-14 13:01:45.235  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-0 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-4 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-1 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-2 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236  INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-3 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.250  INFO 18191 --- [n-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-mygroup2-pn-retry-1-4, groupId=mygroup2-pn-retry-1] Seeking to offset 1 for partition dmoditest-pn-retry-1-0
2023-12-14 13:01:45.251  INFO 18191 --- [#7-pn-dlt-0-C-1] com.dmodi.KafkaRetry                     : Received record from DLT: dmodi5
2023-12-14 13:01:45.254  INFO 18191 --- [n-retry-1-0-C-1] com.dmodi.KafkaRetry                     : Received record **** : dmodi5 dmoditest-pn-retry-1
2023-12-14 13:01:45.255 ERROR 18191 --- [n-retry-1-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = dmoditest-pn-retry-1, partition = 0, offset = 1, main topic = dmoditest threw an error at topic dmoditest-pn-retry-1 and won't be retried. Sending to DLT with name dmoditest-pn-dlt.

© www.soinside.com 2019 - 2024. All rights reserved.