我有一个 Spring Kafka 应用程序,它接收 HTTP 请求并将其有效负载发送到 Kafka 主题。我想预见以下不成功的情况:
在这种情况下,应用程序无法无限重试在 Kafka 中发布,因为它应该返回 HTTP 响应:无论成功与否。
我做了什么
delivery.timeout.ms
属性。并以这种方式实现发送到Kafka:
...
@Value("${spring.kafka.producer.properties[delivery.timeout.ms]}")
private Long deliveryTimeoutMs;
public void sendMessage(String data) throws InterruptedException {
try {
kafkaTemplate.send(topic, data).get(deliveryTimeoutMs, MILLISECONDS);
} catch (CancellationException | ExecutionException | TimeoutException e) {
String msg = String.format(UNABLE_TO_PUBLISH_DATA_IN_KAFKA, topic, data);
log.error(msg, e);
throw new KafkaProducerServiceException(msg, e);
} catch (InterruptedException e) {
String msg = String.format(UNABLE_TO_PUBLISH_DATA_IN_KAFKA, topic, data);
log.error(msg, e);
throw e;
}
}
因此,应用程序尝试在 Kafka 中发送数据,等待
delivery.timeout.ms
属性中指定的相同时间段,如果失败,它将返回 500 HTTP 响应。
问题 当我尝试执行以下操作时:
delivery.timeout.ms
后收到 500 响应)然后我看到 来自失败的 HTTP 请求的数据无论如何都已在 Kafka 中发布了 – 即使在
delivery.timeout.ms
之后。甚至在更长的时间(比如 30 分钟)之后。但发送方系统已经收到500响应。如果重试此失败的请求,数据将在 Kafka 主题中重复。我怎样才能避免这种情况?
附注如果您发现我的思路有任何错误,请告诉我。
谢谢!
如果你低于spring kafka 3.0 下面是一个可能的解决方案 希望这有帮助
var future = template.send(producerRecordSuccess).completable();
future.whenComplete((result,ex) -> {
if (ex == null){
//return 200
}
else if(ex != null){
//return 503
}
});