在 Spring Kafka 中,我有两个 Kafka 监听器收听同一主题。我希望两个侦听器都有自己的重试和死信主题,这样如果一个侦听器发生故障,它就不会重播给另一个侦听器。通过以下配置,他们最终都会使用相同的重试和死信主题
class PaymentListener {
@KafkaListener(topics = ["payment-topic"],)
fun sendBill(payment: Payment) {
// ...some logic
}
@KafkaListener(topics = ["payment-topic"],)
fun adjustBalance(payment: Payment) {
// ...some logic
}
}
@Bean
fun paymentTopicRetryStrategy(
template: KafkaTemplate<String, ClearingFinishedEvent>,
): RetryTopicConfiguration {
return RetryTopicConfigurationBuilder
.newInstance()
.useSingleTopicForSameIntervals()
.fixedBackOff(3000)
.maxAttempts(3)
.includeTopic("payment-topic")
.dltProcessingFailureStrategy(DltStrategy.ALWAYS_RETRY_ON_ERROR)
.autoCreateTopics(true, 1, BROKER_DEFAULT)
.create(template)
}
我真正想要的是:
[sendBill 侦听器使用这些作为重试和死信主题]
[adjustBalance 侦听器使用这些作为重试和死信主题]
我可以编写一个自定义命名策略(https://docs.spring.io/spring-kafka/docs/current/reference/html/#custom-naming-strategies),但侦听器的详细信息不可用我所以无法说得足够具体。
有什么想法吗?
不要使用
RetryTopicConfigurationBuilder
(按主题名称过滤),而是在侦听器上使用 @RetryableTopic
;然后您可以使用自定义的 retryTopicSuffix
。
然后您可以拥有类似
payment-topic-bill-retry
和 payment-topic-adjust-balance-retry
等的东西
我尝试对多个侦听器使用 retryTopicSuffix,但只有最后一个“pn-retry”被调用。有什么建议吗?
springBoot版本=2.7.12 springKafka版本=2.9.10
package com.dmodi;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.annotation.DltHandler;
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.RetryableTopic;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.retrytopic.FixedDelayStrategy;
import org.springframework.retry.annotation.Backoff;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@EnableKafkaRetryTopic
@Slf4j
public class KafkaRetry {
public static final String TOPIC = "dmoditest";
public static void main(String[] args) {
SpringApplication.run(KafkaRetry.class, args);
}
@Bean
public NewTopic mainTopic() {
return TopicBuilder
.name(TOPIC)
.partitions(10)
.replicas(1)
.build();
}
@RetryableTopic(
attempts = "3",
backoff = @Backoff(delayExpression = "5000"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
autoCreateTopics = "true",
numPartitions = "5",
replicationFactor = "1",
retryTopicSuffix = "-email-retry",
dltTopicSuffix = "-email-dlt"
)
@KafkaListener(topics = TOPIC, concurrency = "3", groupId = "mygroup")
public void nonBlockingRetriesListen(ConsumerRecord<String, String> record) {
log.info("Received record ----- : {} {}", record.value(), record.topic());
throw new RuntimeException("retry");
}
@DltHandler
public void dltHandler(ConsumerRecord<String, String> record) {
log.info("Received record from DLT: {}", record.value());
}
@RetryableTopic(attempts = "3",
backoff = @Backoff(delayExpression = "5000"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
autoCreateTopics = "true",
numPartitions = "5",
replicationFactor = "1",
retryTopicSuffix = "-pn-retry",
dltTopicSuffix = "-pn-dlt"
)
@KafkaListener(topics = TOPIC, concurrency = "3", groupId = "mygroup2")
public void nonBlockingRetriesListen2(ConsumerRecord<String, String> record) {
log.info("Received record **** : {} {}", record.value(), record.topic());
throw new RuntimeException("retry");
}
}
日志:
2023-12-14 13:01:34.510 INFO 18191 --- [ntainer#0-0-C-1] com.dmodi.KafkaRetry : Received record ----- : dmodi5 dmoditest
2023-12-14 13:01:34.510 INFO 18191 --- [ntainer#4-0-C-1] com.dmodi.KafkaRetry : Received record **** : dmodi5 dmoditest
2023-12-14 13:01:35.022 INFO 18191 --- [ntainer#4-0-C-1] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Instantiated an idempotent producer.
2023-12-14 13:01:35.026 INFO 18191 --- [ntainer#4-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 3.1.2
2023-12-14 13:01:35.026 INFO 18191 --- [ntainer#4-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: f8c67dc3ae0a3265
2023-12-14 13:01:35.026 INFO 18191 --- [ntainer#4-0-C-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1702587695026
2023-12-14 13:01:35.030 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: MZRjv5tgRIaOC3zYNz8L_g
2023-12-14 13:01:35.030 INFO 18191 --- [ad | producer-1] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-1] ProducerId set to 2012 with epoch 0
2023-12-14 13:01:35.032 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-0 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-1 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-4 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-2 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.032 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-0-3 to 0 since the associated topicId changed from null to eanmWLzASeC1x58llfd6lw
2023-12-14 13:01:35.041 INFO 18191 --- [n-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-mygroup2-pn-retry-0-1, groupId=mygroup2-pn-retry-0] Seeking to offset 0 for partition dmoditest-pn-retry-0-0
2023-12-14 13:01:39.637 INFO 18191 --- [n-retry-0-0-C-1] com.dmodi.KafkaRetry : Received record **** : dmodi5 dmoditest-pn-retry-0
2023-12-14 13:01:40.149 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-0 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-1 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-4 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-2 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.149 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-retry-1-3 to 0 since the associated topicId changed from null to Mb0YnbR5Qu6el3bZ6fKkog
2023-12-14 13:01:40.167 INFO 18191 --- [n-retry-0-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-mygroup2-pn-retry-0-1, groupId=mygroup2-pn-retry-0] Seeking to offset 1 for partition dmoditest-pn-retry-0-0
2023-12-14 13:01:40.168 INFO 18191 --- [n-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-mygroup2-pn-retry-1-4, groupId=mygroup2-pn-retry-1] Seeking to offset 0 for partition dmoditest-pn-retry-1-0
2023-12-14 13:01:40.171 INFO 18191 --- [n-retry-0-0-C-1] com.dmodi.KafkaRetry : Received record **** : dmodi5 dmoditest-pn-retry-0
2023-12-14 13:01:44.726 INFO 18191 --- [n-retry-1-0-C-1] com.dmodi.KafkaRetry : Received record **** : dmodi5 dmoditest-pn-retry-1
2023-12-14 13:01:44.743 ERROR 18191 --- [n-retry-1-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = dmoditest-pn-retry-1, partition = 0, offset = 0, main topic = dmoditest threw an error at topic dmoditest-pn-retry-1 and won't be retried. Sending to DLT with name dmoditest-pn-dlt.
2023-12-14 13:01:45.235 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-0 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-4 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-1 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-2 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.236 INFO 18191 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Resetting the last seen epoch of partition dmoditest-pn-dlt-3 to 0 since the associated topicId changed from null to KKn33x_mSUW3z3KRdSqNHA
2023-12-14 13:01:45.250 INFO 18191 --- [n-retry-1-0-C-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-mygroup2-pn-retry-1-4, groupId=mygroup2-pn-retry-1] Seeking to offset 1 for partition dmoditest-pn-retry-1-0
2023-12-14 13:01:45.251 INFO 18191 --- [#7-pn-dlt-0-C-1] com.dmodi.KafkaRetry : Received record from DLT: dmodi5
2023-12-14 13:01:45.254 INFO 18191 --- [n-retry-1-0-C-1] com.dmodi.KafkaRetry : Received record **** : dmodi5 dmoditest-pn-retry-1
2023-12-14 13:01:45.255 ERROR 18191 --- [n-retry-1-0-C-1] k.r.DeadLetterPublishingRecovererFactory : Record: topic = dmoditest-pn-retry-1, partition = 0, offset = 1, main topic = dmoditest threw an error at topic dmoditest-pn-retry-1 and won't be retried. Sending to DLT with name dmoditest-pn-dlt.