我有一个配置为手动确认的 kafka 侦听器。
@KafkaListener(
topics = ["my-topic"],
groupId = "group-id",
containerFactory = "containerFactoryManualAck",
clientIdPrefix = "prefix"
)
fun processUpdateDispatchStatusEvent(message: String, ack: Acknowledgment,
@Header(KafkaHeaders.DELIVERY_ATTEMPT) attempt: Int) {
try {
println("attempt: $attempt")
//logic that might fails
ack.acknowledge();
} catch (e: Exception) {
ack.nack(10000)
}
}
消息已成功重新传送,带有
ack.nack(10000)
,但 DELIVERY_ATTEMPT
标头始终为 1
我认为这是因为
DELIVERY_ATTEMPT
不适合与 Acknowledgment.nack
一起使用,但我不知道如何使用这种方法来跟踪尝试。
有什么建议吗?
Springboot版本是
2.3.10.RELEASE
spring-kafka版本是
2.5.12.RELEASE
首先,旧版本的 Spring Boot 已经失去支持很多年了。因此,您需要处理一些过时的库。新版本中可能有一些修复。
您没有表明您使用:
/**
* Set to true to populate the
* {@link org.springframework.kafka.support.KafkaHeaders#DELIVERY_ATTEMPT} header when
* the error handler or after rollback processor implements
* {@code DeliveryAttemptAware}. There is a small overhead so this is false by
* default.
* @param deliveryAttemptHeader true to populate
* @since 2.5
*/
public void setDeliveryAttemptHeader(boolean deliveryAttemptHeader) {
但否则
KafkaHeaders.DELIVERY_ATTEMPT
就不会是 1
。